Author: kwall
Date: Thu Sep 13 20:23:21 2012
New Revision: 1384512
URL: http://svn.apache.org/viewvc?rev=1384512&view=rev
Log:
QPID-4302: 0-8..0-9-1 client should sync after message.acknowledge()
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1384512&r1=1384511&r2=1384512&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
Thu Sep 13 20:23:21 2012
@@ -64,6 +64,11 @@ public class AMQSession_0_8 extends AMQS
/** Used for debugging. */
private static final Logger _logger =
LoggerFactory.getLogger(AMQSession.class);
+ public static final String QPID_SYNC_AFTER_CLIENT_ACK =
"qpid.sync_after_client.ack";
+
+ private final boolean _syncAfterClientAck =
+
Boolean.parseBoolean(System.getProperty(QPID_SYNC_AFTER_CLIENT_ACK, "true"));
+
/**
* The period to wait while flow controlled before sending a log message
confirming that the session is still
* waiting on flow control being revoked
@@ -120,8 +125,9 @@ public class AMQSession_0_8 extends AMQS
return getProtocolHandler().getProtocolVersion();
}
- protected void acknowledgeImpl()
+ protected void acknowledgeImpl() throws JMSException
{
+ boolean syncRequired = false;
while (true)
{
Long tag = getUnacknowledgedMessageTags().poll();
@@ -131,6 +137,19 @@ public class AMQSession_0_8 extends AMQS
}
acknowledgeMessage(tag, false);
+ syncRequired = true;
+ }
+
+ try
+ {
+ if (syncRequired && _syncAfterClientAck)
+ {
+ sync();
+ }
+ }
+ catch (AMQException a)
+ {
+ throw new JMSAMQException("Failed to sync after acknowledge", a);
}
}
@@ -681,7 +700,7 @@ public class AMQSession_0_8 extends AMQS
boolean noLocal,
boolean noWait) throws
AMQException
{
- throw new UnsupportedOperationException("The new addressing based
sytanx is "
+ throw new UnsupportedOperationException("The new addressing based
syntax is "
+ "not supported for AMQP 0-8/0-9 versions");
}
Modified:
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java?rev=1384512&r1=1384511&r2=1384512&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
(original)
+++
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
Thu Sep 13 20:23:21 2012
@@ -189,14 +189,6 @@ public class TestAMQSession extends AMQS
{
}
- public void handleAddressBasedDestination(AMQDestination dest,
- boolean isConsumer,
- boolean noWait) throws
AMQException
- {
- throw new UnsupportedOperationException("The new addressing based
sytanx is "
- + "not supported for AMQP 0-8/0-9 versions");
- }
-
@Override
protected void flushAcknowledgments()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]