Author: rgodfrey
Date: Thu Jan 14 15:54:05 2010
New Revision: 899259
URL: http://svn.apache.org/viewvc?rev=899259&view=rev
Log:
QPID-2340 : Fix ProducerFlowControlTest to call a synchronous operation between
sends
Modified:
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
Modified:
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=899259&r1=899258&r2=899259&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Thu Jan 14 15:54:05 2010
@@ -1340,6 +1340,8 @@
declareExchange(name, type, getProtocolHandler(), nowait);
}
+ abstract public void sync() throws AMQException;
+
public int getAcknowledgeMode() throws JMSException
{
checkNotClosed();
Modified:
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=899259&r1=899258&r2=899259&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Thu Jan 14 15:54:05 2010
@@ -922,6 +922,11 @@
{
return Serial.lt((int) currentMark, (int) deliveryTag);
}
+
+ public void sync() throws AMQException
+ {
+ _qpidSession.sync();
+ }
public AMQMessageDelegateFactory getMessageDelegateFactory()
{
Modified:
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=899259&r1=899258&r2=899259&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
Thu Jan 14 15:54:05 2010
@@ -578,5 +578,10 @@
{
return AMQMessageDelegateFactory.FACTORY_0_8;
}
+
+ public void sync() throws AMQException
+ {
+ declareExchange(new AMQShortString("amq.direct"), new
AMQShortString("direct"), false);
+ }
}
Modified:
qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java?rev=899259&r1=899258&r2=899259&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
Thu Jan 14 15:54:05 2010
@@ -206,7 +206,7 @@
Thread.sleep(10000);
List<String> results = _monitor.findMatches("Message send delayed by");
- assertEquals("Incorrect number of delay messages logged by
client",3,results.size());
+ assertTrue("Incorrect number of delay messages logged by client -
expect at least 3, got " + results.size(),3 <= results.size());
results = _monitor.findMatches("Message send failed due to timeout
waiting on broker enforced flow control");
assertEquals("Incorrect number of send failure messages logged by
client",1,results.size());
@@ -407,8 +407,7 @@
consumer.receive();
//perform a synchronous op on the connection
- ((AMQSession) consumerSession).declareExchange(
- new AMQShortString("amq.direct"), new
AMQShortString("direct"), false);
+ ((AMQSession) consumerSession).sync();
assertFalse("Queue should not be overfull",
queueMBean.isFlowOverfull());
@@ -434,12 +433,15 @@
producer.send(nextMessage(msg, producerSession));
_sentMessages.incrementAndGet();
+
try
{
- Thread.sleep(sleepPeriod);
+ ((AMQSession)producerSession).sync();
}
- catch (InterruptedException e)
+ catch (AMQException e)
{
+ e.printStackTrace();
+ throw new RuntimeException(e);
}
}
}
@@ -493,4 +495,4 @@
return _exception;
}
}
-}
\ No newline at end of file
+}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]