Rob,
I noticed the following in AMQSession_0_8.java
+ public void sync() throws AMQException
+ {
+ declareExchange(new AMQShortString("amq.direct"), new
AMQShortString("direct"), false);
+ }
It's quite possible that I misunderstood the intent here, but it kinda
looked odd to me.
So I thought I'd ask just to be sure.
Rajith
On Thu, Jan 14, 2010 at 11:53 AM, <[email protected]> wrote:
> Author: rgodfrey
> Date: Thu Jan 14 16:53:21 2010
> New Revision: 899296
>
> URL: http://svn.apache.org/viewvc?rev=899296&view=rev
> Log:
> QPID-2340 : Fix ProducerFlowControlTest to call a synchronous operation
> between sends (merged from 0.5-dev)
>
> Modified:
>
> qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
>
> qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
>
> qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
>
> qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
>
> Modified:
> qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
> URL:
> http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=899296&r1=899295&r2=899296&view=diff
> ==============================================================================
> ---
> qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
> (original)
> +++
> qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
> Thu Jan 14 16:53:21 2010
> @@ -1339,6 +1339,8 @@
> declareExchange(name, type, getProtocolHandler(), nowait);
> }
>
> + abstract public void sync() throws AMQException;
> +
> public int getAcknowledgeMode() throws JMSException
> {
> checkNotClosed();
>
> Modified:
> qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
> URL:
> http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=899296&r1=899295&r2=899296&view=diff
> ==============================================================================
> ---
> qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
> (original)
> +++
> qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
> Thu Jan 14 16:53:21 2010
> @@ -922,6 +922,11 @@
> {
> return Serial.lt((int) currentMark, (int) deliveryTag);
> }
> +
> + public void sync() throws AMQException
> + {
> + _qpidSession.sync();
> + }
>
> public AMQMessageDelegateFactory getMessageDelegateFactory()
> {
>
> Modified:
> qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
> URL:
> http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=899296&r1=899295&r2=899296&view=diff
> ==============================================================================
> ---
> qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
> (original)
> +++
> qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
> Thu Jan 14 16:53:21 2010
> @@ -584,5 +584,10 @@
> {
> return AMQMessageDelegateFactory.FACTORY_0_8;
> }
> +
> + public void sync() throws AMQException
> + {
> + declareExchange(new AMQShortString("amq.direct"), new
> AMQShortString("direct"), false);
> + }
>
> }
>
> Modified:
> qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
> URL:
> http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java?rev=899296&r1=899295&r2=899296&view=diff
> ==============================================================================
> ---
> qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
> (original)
> +++
> qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
> Thu Jan 14 16:53:21 2010
> @@ -409,8 +409,7 @@
> consumer.receive();
>
> //perform a synchronous op on the connection
> - ((AMQSession) consumerSession).declareExchange(
> - new AMQShortString("amq.direct"), new
> AMQShortString("direct"), false);
> + ((AMQSession) consumerSession).sync();
>
> assertFalse("Queue should not be overfull",
> queueMBean.isFlowOverfull());
>
> @@ -436,12 +435,15 @@
> producer.send(nextMessage(msg, producerSession));
> _sentMessages.incrementAndGet();
>
> +
> try
> {
> - Thread.sleep(sleepPeriod);
> + ((AMQSession)producerSession).sync();
> }
> - catch (InterruptedException e)
> + catch (AMQException e)
> {
> + e.printStackTrace();
> + throw new RuntimeException(e);
> }
> }
> }
> @@ -495,4 +497,4 @@
> return _exception;
> }
> }
> -}
> \ No newline at end of file
> +}
>
>
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project: http://qpid.apache.org
> Use/Interact: mailto:[email protected]
>
>
--
Regards,
Rajith Attapattu
Red Hat
http://rajith.2rlabs.com/
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]