Author: rajith
Date: Tue Jan 11 02:58:14 2011
New Revision: 1057460
URL: http://svn.apache.org/viewvc?rev=1057460&view=rev
Log:
QPID-2994
When the session is marked transactional, message transfers are not replayed.
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1057460&r1=1057459&r2=1057460&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Tue Jan 11 02:58:14 2011
@@ -167,6 +167,7 @@ public class AMQSession_0_10 extends AMQ
if (_transacted)
{
_qpidSession.txSelect();
+ _qpidSession.setTransacted(true);
}
if (maxAckDelay > 0)
Modified:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=1057460&r1=1057459&r2=1057460&view=diff
==============================================================================
---
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
(original)
+++
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
Tue Jan 11 02:58:14 2011
@@ -89,7 +89,7 @@ public class Session extends SessionInvo
private int channel;
private SessionDelegate delegate;
private SessionListener listener = new DefaultSessionListener();
- private long timeout = 60000;
+ private long timeout = 6000;
private boolean autoSync = false;
private boolean incomingInit;
@@ -116,7 +116,8 @@ public class Session extends SessionInvo
private Semaphore credit = new Semaphore(0);
private Thread resumer = null;
-
+ private boolean transacted = false;
+
protected Session(Connection connection, Binary name, long expiry)
{
this(connection, new SessionDelegate(), name, expiry);
@@ -645,7 +646,7 @@ public class Session extends SessionInvo
{
sessionCommandPoint(0, 0);
}
- if ((!closing && m instanceof MessageTransfer) ||
m.hasCompletionListener())
+ if ((!closing && !transacted && m instanceof MessageTransfer)
|| m.hasCompletionListener())
{
commands[mod(next, commands.length)] = m;
commandBytes += m.getBodySize();
@@ -992,4 +993,9 @@ public class Session extends SessionInvo
{
return String.format("ssn:%s", name);
}
+
+ public void setTransacted(boolean b) {
+ this.transacted = b;
+ }
+
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]