Author: ritchiem
Date: Mon Oct 5 15:01:17 2009
New Revision: 821821
URL: http://svn.apache.org/viewvc?rev=821821&view=rev
Log:
Updated testing harness: QpidTestCase and FailoverBaseCase.
Updates to QpidTestCase:
to allow the sending of messages that are tagged from an offset value.
to correctly commit the sent messages if the sent number does not fit within
the batch window.
update to createMessage to add an INDEX int property.
update to rename ssl connectionfactory default.ssl to align with other
factories. updated test-provider accordingly.
added a getTestQueue method that creates an AMQQueue using getTestQueueName
to further simplify tests.
Updates to FailoverBaseCase
removed setFailingPort, failBroker now takes the port to fail, this allows
both brokers to be failed more easily.
Update to ensure that all subclasses get a failover connection url, through
the getConnection(), some were not. Fixed by overriding getConnectionFactory()
call.
Fixed second broker work directory to write to defined QPID_WORK rather than
java.io.tmpdir. If QPID_WORK is not set then tests will fall back to
java.io.tmpdir
getFailingPort now returns the correct value if you are running InVM (2)or
externally(test.alt.port)
Updated tests that used failBroker to use failBroker(getFailingPort()) so tests
will work InVM and externally.
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
qpid/trunk/qpid/java/test-profiles/test-provider.properties
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java?rev=821821&r1=821820&r2=821821&view=diff
==============================================================================
---
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java
(original)
+++
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java
Mon Oct 5 15:01:17 2009
@@ -63,7 +63,6 @@
*/
public class DeepQueueConsumeWithSelector extends QpidTestCase implements
MessageListener
{
- private static final String INDEX = "index";
private static final int MESSAGE_COUNT = 10000;
private static final int BATCH_SIZE = MESSAGE_COUNT / 10;
@@ -129,9 +128,7 @@
@Override
public Message createNextMessage(Session session, int msgCount) throws
JMSException
{
- Message message = session.createTextMessage("Message :" + msgCount);
-
- message.setIntProperty(INDEX, msgCount);
+ Message message = super.createNextMessage(session,msgCount);
if ((msgCount % BATCH_SIZE) == 0 )
{
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java?rev=821821&r1=821820&r2=821821&view=diff
==============================================================================
---
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
(original)
+++
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
Mon Oct 5 15:01:17 2009
@@ -497,7 +497,7 @@
if (msgCount == failPoint)
{
- failBroker();
+ failBroker(getFailingPort());
}
}
@@ -529,7 +529,7 @@
sendMessages("connection2", messages);
}
- failBroker();
+ failBroker(getFailingPort());
checkQueueDepth(messages);
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java?rev=821821&r1=821820&r2=821821&view=diff
==============================================================================
---
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
(original)
+++
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
Mon Oct 5 15:01:17 2009
@@ -37,7 +37,6 @@
import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQSession_0_10;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.ConnectionURL;
@@ -58,13 +57,12 @@
private Session consumerSession;
private MessageConsumer consumer;
- private static int usedBrokers = 0;
private CountDownLatch failoverComplete;
- private static final long DEFAULT_FAILOVER_TIME = 10000L;
private boolean CLUSTERED = Boolean.getBoolean("profile.clustered");
private int seed;
private Random rand;
-
+ private int _currentPort = getFailingPort();
+
@Override
protected void setUp() throws Exception
{
@@ -227,7 +225,7 @@
_logger.info("Failing over");
- causeFailure(DEFAULT_FAILOVER_TIME);
+ causeFailure(_currentPort, DEFAULT_FAILOVER_TIME);
// Check that you produce and consume the rest of messages.
_logger.debug("==================");
@@ -242,10 +240,10 @@
_logger.debug("==================");
}
- private void causeFailure(long delay)
+ private void causeFailure(int port, long delay)
{
- failBroker();
+ failBroker(port);
_logger.info("Awaiting Failover completion");
try
@@ -268,7 +266,7 @@
Message msg = consumer.receive();
assertNotNull("Expected msgs not received", msg);
- causeFailure(DEFAULT_FAILOVER_TIME);
+ causeFailure(getFailingPort(), DEFAULT_FAILOVER_TIME);
Exception failure = null;
try
@@ -314,7 +312,7 @@
long failTime = System.nanoTime() + FAILOVER_DELAY * 1000000;
//Fail the first broker
- causeFailure(FAILOVER_DELAY + DEFAULT_FAILOVER_TIME);
+ causeFailure(getFailingPort(), FAILOVER_DELAY + DEFAULT_FAILOVER_TIME);
//Reconnection should occur
assertTrue("Failover did not take long enough", System.nanoTime() >
failTime);
@@ -344,15 +342,15 @@
_logger.debug("===================================================================");
runP2PFailover(numMessages, false,false, false);
- startBroker(getFailingPort());
+ startBroker(_currentPort);
if (useAltPort)
{
- setFailingPort(altPort);
+ _currentPort = altPort;
useAltPort = false;
}
else
{
- setFailingPort(stdPort);
+ _currentPort = stdPort;
useAltPort = true;
}
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java?rev=821821&r1=821820&r2=821821&view=diff
==============================================================================
---
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java
(original)
+++
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java
Mon Oct 5 15:01:17 2009
@@ -23,8 +23,7 @@
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.Session;
-import org.apache.qpid.test.utils.QpidTestCase;
-
+import org.apache.qpid.test.utils.FailoverBaseCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,16 +34,21 @@
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.TextMessage;
-
import java.util.concurrent.atomic.AtomicInteger;
-public class RecoverTest extends QpidTestCase
+public class RecoverTest extends FailoverBaseCase
{
- private static final Logger _logger =
LoggerFactory.getLogger(RecoverTest.class);
+ static final Logger _logger = LoggerFactory.getLogger(RecoverTest.class);
private Exception _error;
private AtomicInteger count;
+ protected AMQConnection _connection;
+ protected Session _consumerSession;
+ protected MessageConsumer _consumer;
+ static final int SENT_COUNT = 4;
+
+ @Override
protected void setUp() throws Exception
{
super.setUp();
@@ -52,134 +56,110 @@
count = new AtomicInteger();
}
- protected void tearDown() throws Exception
+ protected void initTest() throws Exception
{
- super.tearDown();
- count = null;
- }
+ _connection = (AMQConnection) getConnection("guest", "guest");
- public void testRecoverResendsMsgs() throws Exception
- {
- AMQConnection con = (AMQConnection) getConnection("guest", "guest");
-
- Session consumerSession = con.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
- Queue queue =
- new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new
AMQShortString("someQ"),
- new AMQShortString("someQ"), false, true);
- MessageConsumer consumer = consumerSession.createConsumer(queue);
- // force synch to ensure the consumer has resulted in a bound queue
- // ((AMQSession)
consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME,
ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
- // This is the default now
+ _consumerSession = _connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = _consumerSession.createQueue(getTestQueueName());
- AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
- Session producerSession = con2.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(queue);
+ _consumer = _consumerSession.createConsumer(queue);
_logger.info("Sending four messages");
- producer.send(producerSession.createTextMessage("msg1"));
- producer.send(producerSession.createTextMessage("msg2"));
- producer.send(producerSession.createTextMessage("msg3"));
- producer.send(producerSession.createTextMessage("msg4"));
-
- con2.close();
-
+ sendMessage(_connection.createSession(false,
Session.AUTO_ACKNOWLEDGE), queue, SENT_COUNT);
_logger.info("Starting connection");
- con.start();
- TextMessage tm = (TextMessage) consumer.receive();
- tm.acknowledge();
- _logger.info("Received and acknowledged first message");
- consumer.receive();
- consumer.receive();
- consumer.receive();
- _logger.info("Received all four messages. Calling recover with three
outstanding messages");
- // no ack for last three messages so when I call recover I expect to
get three messages back
- consumerSession.recover();
- tm = (TextMessage) consumer.receive(3000);
- assertEquals("msg2", tm.getText());
+ _connection.start();
+ }
- tm = (TextMessage) consumer.receive(3000);
- assertEquals("msg3", tm.getText());
+ protected Message validateNextMessages(int nextCount, int startIndex)
throws JMSException
+ {
+ Message message = null;
+ for (int index = 0; index < nextCount; index++)
+ {
+ message = _consumer.receive(3000);
+ assertEquals(startIndex + index, message.getIntProperty(INDEX));
+ }
+ return message;
+ }
- tm = (TextMessage) consumer.receive(3000);
- assertEquals("msg4", tm.getText());
+ protected void validateRemainingMessages(int remaining) throws JMSException
+ {
+ int index = SENT_COUNT - remaining;
- _logger.info("Received redelivery of three messages. Acknowledging
last message");
- tm.acknowledge();
+ Message message = null;
+ while (index != SENT_COUNT)
+ {
+ message = _consumer.receive(3000);
+ assertEquals(index++, message.getIntProperty(INDEX));
+ }
+
+ if (message != null)
+ {
+ _logger.info("Received redelivery of three messages. Acknowledging
last message");
+ message.acknowledge();
+ }
_logger.info("Calling acknowledge with no outstanding messages");
// all acked so no messages to be delivered
- consumerSession.recover();
+ _consumerSession.recover();
- tm = (TextMessage) consumer.receiveNoWait();
- assertNull(tm);
+ message = _consumer.receiveNoWait();
+ assertNull(message);
_logger.info("No messages redelivered as is expected");
-
- con.close();
}
- public void testRecoverResendsMsgsAckOnEarlier() throws Exception
+ public void testRecoverResendsMsgs() throws Exception
{
- AMQConnection con = (AMQConnection) getConnection("guest", "guest");
+ initTest();
- Session consumerSession = con.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
- Queue queue =
- new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new
AMQShortString("someQ"),
- new AMQShortString("someQ"), false, true);
- MessageConsumer consumer = consumerSession.createConsumer(queue);
- // force synch to ensure the consumer has resulted in a bound queue
- // ((AMQSession)
consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME,
ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
- // This is the default now
+ Message message = validateNextMessages(1, 0);
+ message.acknowledge();
+ _logger.info("Received and acknowledged first message");
- AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
- Session producerSession = con2.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(queue);
+ _consumer.receive();
+ _consumer.receive();
+ _consumer.receive();
+ _logger.info("Received all four messages. Calling recover with three
outstanding messages");
+ // no ack for last three messages so when I call recover I expect to
get three messages back
- _logger.info("Sending four messages");
- producer.send(producerSession.createTextMessage("msg1"));
- producer.send(producerSession.createTextMessage("msg2"));
- producer.send(producerSession.createTextMessage("msg3"));
- producer.send(producerSession.createTextMessage("msg4"));
+ _consumerSession.recover();
- con2.close();
+ validateRemainingMessages(3);
+ }
- _logger.info("Starting connection");
- con.start();
- TextMessage tm = (TextMessage) consumer.receive();
- consumer.receive();
- tm.acknowledge();
+ public void testRecoverResendsMsgsAckOnEarlier() throws Exception
+ {
+ initTest();
+
+ Message message = validateNextMessages(2, 0);
+ message.acknowledge();
_logger.info("Received 2 messages, acknowledge() first message, should
acknowledge both");
- consumer.receive();
- consumer.receive();
+ _consumer.receive();
+ _consumer.receive();
_logger.info("Received all four messages. Calling recover with two
outstanding messages");
// no ack for last three messages so when I call recover I expect to
get three messages back
- consumerSession.recover();
- TextMessage tm3 = (TextMessage) consumer.receive(3000);
- assertEquals("msg3", tm3.getText());
+ _consumerSession.recover();
+
+ Message message2 = _consumer.receive(3000);
+ assertEquals(2, message2.getIntProperty(INDEX));
- TextMessage tm4 = (TextMessage) consumer.receive(3000);
- assertEquals("msg4", tm4.getText());
+ Message message3 = _consumer.receive(3000);
+ assertEquals(3, message3.getIntProperty(INDEX));
_logger.info("Received redelivery of two messages. calling
acknolwedgeThis() first of those message");
- ((org.apache.qpid.jms.Message) tm3).acknowledgeThis();
+ ((org.apache.qpid.jms.Message) message2).acknowledgeThis();
_logger.info("Calling recover");
// all acked so no messages to be delivered
- consumerSession.recover();
+ _consumerSession.recover();
- tm4 = (TextMessage) consumer.receive(3000);
- assertEquals("msg4", tm4.getText());
- ((org.apache.qpid.jms.Message) tm4).acknowledgeThis();
+ message3 = _consumer.receive(3000);
+ assertEquals(3, message3.getIntProperty(INDEX));
+ ((org.apache.qpid.jms.Message) message3).acknowledgeThis();
- _logger.info("Calling recover");
// all acked so no messages to be delivered
- consumerSession.recover();
-
- tm = (TextMessage) consumer.receiveNoWait();
- assertNull(tm);
- _logger.info("No messages redelivered as is expected");
-
- con.close();
+ validateRemainingMessages(0);
}
public void testAcknowledgePerConsumer() throws Exception
@@ -188,11 +168,11 @@
Session consumerSession = con.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
Queue queue =
- new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new
AMQShortString("Q1"), new AMQShortString("Q1"),
- false, true);
+ new AMQQueue(consumerSession.getDefaultQueueExchangeName(),
new AMQShortString("Q1"), new AMQShortString("Q1"),
+ false, true);
Queue queue2 =
- new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new
AMQShortString("Q2"), new AMQShortString("Q2"),
- false, true);
+ new AMQQueue(consumerSession.getDefaultQueueExchangeName(),
new AMQShortString("Q2"), new AMQShortString("Q2"),
+ false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
MessageConsumer consumer2 = consumerSession.createConsumer(queue2);
@@ -231,8 +211,8 @@
final Session consumerSession = con.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Queue queue =
- new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new
AMQShortString("Q3"), new AMQShortString("Q3"),
- false, true);
+ new AMQQueue(consumerSession.getDefaultQueueExchangeName(),
new AMQShortString("Q3"), new AMQShortString("Q3"),
+ false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
MessageProducer producer = consumerSession.createProducer(queue);
producer.send(consumerSession.createTextMessage("hello"));
@@ -240,50 +220,50 @@
final Object lock = new Object();
consumer.setMessageListener(new MessageListener()
- {
+ {
- public void onMessage(Message message)
+ public void onMessage(Message message)
+ {
+ try
{
- try
+ count.incrementAndGet();
+ if (count.get() == 1)
{
- count.incrementAndGet();
- if (count.get() == 1)
+ if (message.getJMSRedelivered())
{
- if (message.getJMSRedelivered())
- {
- setError(
+ setError(
new Exception("Message marked as
redilvered on what should be first delivery attempt"));
- }
-
- consumerSession.recover();
}
- else if (count.get() == 2)
+
+ consumerSession.recover();
+ }
+ else if (count.get() == 2)
+ {
+ if (!message.getJMSRedelivered())
{
- if (!message.getJMSRedelivered())
- {
- setError(
+ setError(
new Exception(
- "Message not marked as redilvered on
what should be second delivery attempt"));
- }
- }
- else
- {
- System.err.println(message);
- fail("Message delivered too many times!: " +
count);
+ "Message not marked as redilvered
on what should be second delivery attempt"));
}
}
- catch (JMSException e)
+ else
{
- _logger.error("Error recovering session: " + e, e);
- setError(e);
+ System.err.println(message);
+ fail("Message delivered too many times!: " + count);
}
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error recovering session: " + e, e);
+ setError(e);
+ }
- synchronized (lock)
- {
- lock.notify();
- }
+ synchronized (lock)
+ {
+ lock.notify();
}
- });
+ }
+ });
con.start();
@@ -323,9 +303,4 @@
{
_error = e;
}
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(RecoverTest.class);
- }
}
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java?rev=821821&r1=821820&r2=821821&view=diff
==============================================================================
---
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
(original)
+++
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
Mon Oct 5 15:01:17 2009
@@ -22,45 +22,41 @@
import org.apache.qpid.util.FileUtils;
-import javax.jms.Connection;
+import javax.naming.NamingException;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class FailoverBaseCase extends QpidTestCase
{
-
-<<<<<<<
HEAD:qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
-=======
protected static final Logger _logger =
LoggerFactory.getLogger(FailoverBaseCase.class);
->>>>>>> be4ef1c... Update to FBC to ensure second broker is shutdown in the
event of an exception during super.tearDown. This may have been the cause of CI
stuck
brokers:qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
public static int FAILING_VM_PORT = 2;
public static int FAILING_PORT =
Integer.parseInt(System.getProperty("test.port.alt"));
+ public static final long DEFAULT_FAILOVER_TIME = 10000L;
protected int failingPort;
-
- private boolean failedOver = false;
- public FailoverBaseCase()
+ protected int getFailingPort()
{
if (_broker.equals(VM))
{
- failingPort = FAILING_VM_PORT;
+ return FAILING_VM_PORT;
}
else
{
- failingPort = FAILING_PORT;
+ return FAILING_PORT;
}
}
-
- protected int getFailingPort()
- {
- return failingPort;
- }
protected void setUp() throws java.lang.Exception
{
super.setUp();
- setSystemProperty("QPID_WORK",
System.getProperty("QPID_WORK")+"/"+getFailingPort());
- startBroker(failingPort);
+ // Set QPID_WORK to $QPID_WORK/<getFailingPort()>
+ // or /tmp/<getFailingPort()> if QPID_WORK not set.
+ setSystemProperty("QPID_WORK", System.getProperty("QPID_WORK",
System.getProperty("java.io.tmpdir")) + "/" + getFailingPort());
+ startBroker(getFailingPort());
}
/**
@@ -69,16 +65,25 @@
* @return a connection
* @throws Exception
*/
- public Connection getConnection() throws Exception
+ @Override
+ public AMQConnectionFactory getConnectionFactory() throws NamingException
{
- Connection conn =
- (Boolean.getBoolean("profile.use_ssl"))?
-
getConnectionFactory("failover.ssl").createConnection("guest", "guest"):
-
getConnectionFactory("failover").createConnection("guest", "guest");
- _connections.add(conn);
- return conn;
+ _logger.info("get ConnectionFactory");
+ if (_connectionFactory == null)
+ {
+ if (Boolean.getBoolean("profile.use_ssl"))
+ {
+ _connectionFactory = getConnectionFactory("failover.ssl");
+ }
+ else
+ {
+ _connectionFactory = getConnectionFactory("failover");
+ }
+ }
+ return _connectionFactory;
}
+
public void tearDown() throws Exception
{
try
@@ -95,24 +100,17 @@
}
- /**
- * Only used of VM borker.
- */
- public void failBroker()
+ public void failBroker(int port)
{
- failedOver = true;
try
{
- stopBroker(getFailingPort());
+ stopBroker(port);
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
-
- protected void setFailingPort(int p)
- {
- failingPort = p;
- }
+
+
}
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java?rev=821821&r1=821820&r2=821821&view=diff
==============================================================================
---
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
(original)
+++
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
Mon Oct 5 15:01:17 2009
@@ -22,8 +22,10 @@
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionURL;
@@ -81,6 +83,8 @@
private XMLConfiguration _testConfiguration = new XMLConfiguration();
+ protected static final String INDEX = "index";
+
/**
* Some tests are excluded when the property test.excludes is set to true.
* An exclusion list is either a file (prop test.excludesfile) which
contains one test name
@@ -183,7 +187,7 @@
private Map<Integer, Process> _brokers = new HashMap<Integer, Process>();
private InitialContext _initialContext;
- private AMQConnectionFactory _connectionFactory;
+ protected AMQConnectionFactory _connectionFactory;
private String _testName;
@@ -193,8 +197,6 @@
public static final String TOPIC = "topic";
/** Map to hold test defined environment properties */
private Map<String, String> _env;
- protected static final String INDEX = "index";
- ;
public QpidTestCase(String name)
{
@@ -929,7 +931,7 @@
{
if (Boolean.getBoolean("profile.use_ssl"))
{
- _connectionFactory = getConnectionFactory("ssl");
+ _connectionFactory = getConnectionFactory("default.ssl");
}
else
{
@@ -1019,6 +1021,17 @@
return getClass().getSimpleName() + "-" + getName();
}
+ /**
+ * Return a Queue specific for this test.
+ * Uses getTestQueueName() as the name of the queue
+ * @return
+ */
+ public Queue getTestQueue()
+ {
+ return new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME,
getTestQueueName());
+ }
+
+
protected void tearDown() throws java.lang.Exception
{
try
@@ -1070,17 +1083,23 @@
public List<Message> sendMessage(Session session, Destination destination,
int count) throws Exception
{
- return sendMessage(session, destination, count, 0);
+ return sendMessage(session, destination, count, 0, 0);
}
public List<Message> sendMessage(Session session, Destination destination,
int count, int batchSize) throws Exception
{
+ return sendMessage(session, destination, count, 0, batchSize);
+ }
+
+ public List<Message> sendMessage(Session session, Destination destination,
+ int count, int offset, int batchSize)
throws Exception
+ {
List<Message> messages = new ArrayList<Message>(count);
MessageProducer producer = session.createProducer(destination);
- for (int i = 0; i < count; i++)
+ for (int i = offset; i < (count + offset); i++)
{
Message next = createNextMessage(session, i);
@@ -1099,8 +1118,11 @@
}
// Ensure we commit the last messages
- if (session.getTransacted() && (batchSize > 0) &&
- (count / batchSize != 0))
+ // Commit the session if we are transacted and
+ // we have no batchSize or
+ // our count is not divible by batchSize.
+ if (session.getTransacted() &&
+ ( batchSize == 0 || count % batchSize != 0))
{
session.commit();
}
@@ -1111,7 +1133,6 @@
public Message createNextMessage(Session session, int msgCount) throws
JMSException
{
Message message = session.createMessage();
-
message.setIntProperty(INDEX, msgCount);
return message;
Modified: qpid/trunk/qpid/java/test-profiles/test-provider.properties
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/test-provider.properties?rev=821821&r1=821820&r2=821821&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/test-provider.properties (original)
+++ qpid/trunk/qpid/java/test-profiles/test-provider.properties Mon Oct 5
15:01:17 2009
@@ -29,14 +29,13 @@
connectionfactory.default =
amqp://username:passw...@clientid/test?brokerlist='tcp://localhost:${test.port}'
+connectionfactory.default.ssl =
amqp://username:passw...@clientid/test?brokerlist='tcp://localhost:${test.port.ssl}?ssl='true''
connectionfactory.default.vm =
amqp://username:passw...@clientid/test?brokerlist='vm://:1'
-connectionfactory.ssl =
amqp://username:passw...@clientid/test?brokerlist='tcp://localhost:${test.port.ssl}?ssl='true''
connectionfactory.failover =
amqp://username:passw...@clientid/test?brokerlist='tcp://localhost:${test.port.alt};tcp://localhost:${test.port}'&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20''
-
connectionfactory.failover.ssl =
amqp://username:passw...@clientid/test?brokerlist='tcp://localhost:${test.port.alt.ssl}?ssl='true';tcp://localhost:${test.port.ssl}?ssl='true''&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20''
-
connectionfactory.failover.vm =
amqp://username:passw...@clientid/test?brokerlist='vm://:2;vm://:1'
+
connectionfactory.connection1 =
amqp://username:passw...@clientid/test?brokerlist='tcp://localhost:${test.port}'
connectionfactory.connection2 =
amqp://username:passw...@clientid/test?brokerlist='tcp://localhost:${test.port.alt}'
connectionfactory.connection1.vm =
amqp://username:passw...@clientid/test?brokerlist='vm://:1'
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]