Author: rajith
Date: Fri Jan 25 21:32:42 2013
New Revision: 1438725

URL: http://svn.apache.org/viewvc?rev=1438725&view=rev
Log:
QPID-4541 Added the ability to turn off replay at the time the session
is created. The XASessionImpl will use this feature to turn off replay
on the underlying JMS session. This prevents messages being replayed
outside the boundaries of the XA transaction.

Modified:
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
    
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
    
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
    
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java?rev=1438725&r1=1438724&r2=1438725&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
 Fri Jan 25 21:32:42 2013
@@ -88,7 +88,7 @@ public class XASessionImpl extends AMQSe
      */
     public void createSession()
     {
-        _qpidDtxSession = getQpidConnection().createSession(0);
+        _qpidDtxSession = getQpidConnection().createSession(0,true);
         _qpidDtxSession.setSessionListener(this);
         _qpidDtxSession.dtxSelect();
     }

Modified: 
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java?rev=1438725&r1=1438724&r2=1438725&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
 Fri Jan 25 21:32:42 2013
@@ -597,7 +597,7 @@ public class AMQSession_0_10Test extends
         connection.setSessionFactory(new SessionFactory()
         {
 
-            public Session newSession(Connection conn, Binary name, long 
expiry)
+            public Session newSession(Connection conn, Binary name, long 
expiry, boolean isNoReplay)
             {
                 return new MockSession(conn, new SessionDelegate(), name, 
expiry, throwException);
             }

Modified: 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1438725&r1=1438724&r2=1438725&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
 (original)
+++ 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
 Fri Jan 25 21:32:42 2013
@@ -86,15 +86,15 @@ public class Connection extends Connecti
 
     public static interface SessionFactory
     {
-        Session newSession(Connection conn, Binary name, long expiry);
+        Session newSession(Connection conn, Binary name, long expiry, boolean 
isNoReplay);
     }
 
     private static final class DefaultSessionFactory implements SessionFactory
     {
 
-        public Session newSession(final Connection conn, final Binary name, 
final long expiry)
+        public Session newSession(final Connection conn, final Binary name, 
final long expiry, final boolean isNoReplay)
         {
-            return new Session(conn, name, expiry);
+            return new Session(conn, name, expiry, isNoReplay);
         }
     }
 
@@ -296,7 +296,12 @@ public class Connection extends Connecti
 
     public Session createSession(long expiry)
     {
-        return createSession(UUID.randomUUID().toString(), expiry);
+        return createSession(expiry, false);
+    }
+
+    public Session createSession(long expiry, boolean isNoReplay)
+    {
+        return createSession(UUID.randomUUID().toString(), expiry, isNoReplay);
     }
 
     public Session createSession(String name)
@@ -309,6 +314,11 @@ public class Connection extends Connecti
         return createSession(Strings.toUTF8(name), expiry);
     }
 
+    public Session createSession(String name, long expiry,boolean isNoReplay)
+    {
+        return createSession(new Binary(Strings.toUTF8(name)), expiry, 
isNoReplay);
+    }
+
     public Session createSession(byte[] name, long expiry)
     {
         return createSession(new Binary(name), expiry);
@@ -316,6 +326,11 @@ public class Connection extends Connecti
 
     public Session createSession(Binary name, long expiry)
     {
+        return createSession(name, expiry, false);
+    }
+
+    public Session createSession(Binary name, long expiry, boolean isNoReplay)
+    {
         synchronized (lock)
         {
             Waiter w = new Waiter(lock, timeout);
@@ -329,7 +344,7 @@ public class Connection extends Connecti
                 throw new ConnectionException("Timed out waiting for 
connection to be ready. Current state is :" + state);
             }
 
-            Session ssn = _sessionFactory.newSession(this, name, expiry);
+            Session ssn = _sessionFactory.newSession(this, name, expiry, 
isNoReplay);
             registerSession(ssn);
             map(ssn);
             ssn.attach();

Modified: 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=1438725&r1=1438724&r2=1438725&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
 (original)
+++ 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
 Fri Jan 25 21:32:42 2013
@@ -131,19 +131,31 @@ public class Session extends SessionInvo
     private final Object stateLock = new Object();
 
     private final AtomicBoolean _failoverRequired = new AtomicBoolean(false);
+    private boolean _isNoReplay = false;
 
     protected Session(Connection connection, Binary name, long expiry)
     {
         this(connection, new SessionDelegate(), name, expiry);
     }
 
+    protected Session(Connection connection, Binary name, long expiry, boolean 
noReplay)
+    {
+        this(connection, new SessionDelegate(), name, expiry, noReplay);
+    }
+
     protected Session(Connection connection, SessionDelegate delegate, Binary 
name, long expiry)
     {
+        this(connection, delegate, name, expiry,false);
+    }
+
+    protected Session(Connection connection, SessionDelegate delegate, Binary 
name, long expiry, boolean noReplay)
+    {
         this.connection = connection;
         this.delegate = delegate;
         this.name = name;
         this.expiry = expiry;
         this.closing = false;
+        this._isNoReplay = noReplay;
         initReceiver();
     }
 
@@ -281,6 +293,7 @@ public class Session extends SessionInvo
     void resume()
     {
         _failoverRequired.set(false);
+
         synchronized (commandsLock)
         {
             attach();
@@ -739,7 +752,7 @@ public class Session extends SessionInvo
                     sessionCommandPoint(0, 0);
                 }
 
-                boolean replayTransfer = !closing && !transacted &&
+                boolean replayTransfer = !_isNoReplay && !closing && 
!transacted &&
                                          m instanceof MessageTransfer &&
                                          ! m.isUnreliable();
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to