Author: rajith
Date: Tue Jan 11 02:58:14 2011
New Revision: 1057460

URL: http://svn.apache.org/viewvc?rev=1057460&view=rev
Log:
QPID-2994
When the session is marked transactional, message transfers are not replayed.

Modified:
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1057460&r1=1057459&r2=1057460&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 Tue Jan 11 02:58:14 2011
@@ -167,6 +167,7 @@ public class AMQSession_0_10 extends AMQ
         if (_transacted)
         {
             _qpidSession.txSelect();
+            _qpidSession.setTransacted(true);
         }
 
         if (maxAckDelay > 0)

Modified: 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=1057460&r1=1057459&r2=1057460&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
 (original)
+++ 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
 Tue Jan 11 02:58:14 2011
@@ -89,7 +89,7 @@ public class Session extends SessionInvo
     private int channel;
     private SessionDelegate delegate;
     private SessionListener listener = new DefaultSessionListener();
-    private long timeout = 60000;
+    private long timeout = 6000;
     private boolean autoSync = false;
 
     private boolean incomingInit;
@@ -116,7 +116,8 @@ public class Session extends SessionInvo
     private Semaphore credit = new Semaphore(0);
 
     private Thread resumer = null;
-
+    private boolean transacted = false;
+    
     protected Session(Connection connection, Binary name, long expiry)
     {
         this(connection, new SessionDelegate(), name, expiry);
@@ -645,7 +646,7 @@ public class Session extends SessionInvo
                 {
                     sessionCommandPoint(0, 0);
                 }
-                if ((!closing && m instanceof MessageTransfer) || 
m.hasCompletionListener())
+                if ((!closing && !transacted && m instanceof MessageTransfer) 
|| m.hasCompletionListener())
                 {
                     commands[mod(next, commands.length)] = m;
                     commandBytes += m.getBodySize();
@@ -992,4 +993,9 @@ public class Session extends SessionInvo
     {
         return String.format("ssn:%s", name);
     }
+    
+    public void setTransacted(boolean b) {
+        this.transacted = b;
+    }
+    
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to