Author: rajith Date: Fri Jan 25 21:32:42 2013 New Revision: 1438725 URL: http://svn.apache.org/viewvc?rev=1438725&view=rev Log: QPID-4541 Added the ability to turn off replay at the time the session is created. The XASessionImpl will use this feature to turn off replay on the underlying JMS session. This prevents messages being replayed outside the boundaries of the XA transaction.
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java?rev=1438725&r1=1438724&r2=1438725&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java Fri Jan 25 21:32:42 2013 @@ -88,7 +88,7 @@ public class XASessionImpl extends AMQSe */ public void createSession() { - _qpidDtxSession = getQpidConnection().createSession(0); + _qpidDtxSession = getQpidConnection().createSession(0,true); _qpidDtxSession.setSessionListener(this); _qpidDtxSession.dtxSelect(); } Modified: qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java?rev=1438725&r1=1438724&r2=1438725&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java (original) +++ qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java Fri Jan 25 21:32:42 2013 @@ -597,7 +597,7 @@ public class AMQSession_0_10Test extends connection.setSessionFactory(new SessionFactory() { - public Session newSession(Connection conn, Binary name, long expiry) + public Session newSession(Connection conn, Binary name, long expiry, boolean isNoReplay) { return new MockSession(conn, new SessionDelegate(), name, expiry, throwException); } Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1438725&r1=1438724&r2=1438725&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Fri Jan 25 21:32:42 2013 @@ -86,15 +86,15 @@ public class Connection extends Connecti public static interface SessionFactory { - Session newSession(Connection conn, Binary name, long expiry); + Session newSession(Connection conn, Binary name, long expiry, boolean isNoReplay); } private static final class DefaultSessionFactory implements SessionFactory { - public Session newSession(final Connection conn, final Binary name, final long expiry) + public Session newSession(final Connection conn, final Binary name, final long expiry, final boolean isNoReplay) { - return new Session(conn, name, expiry); + return new Session(conn, name, expiry, isNoReplay); } } @@ -296,7 +296,12 @@ public class Connection extends Connecti public Session createSession(long expiry) { - return createSession(UUID.randomUUID().toString(), expiry); + return createSession(expiry, false); + } + + public Session createSession(long expiry, boolean isNoReplay) + { + return createSession(UUID.randomUUID().toString(), expiry, isNoReplay); } public Session createSession(String name) @@ -309,6 +314,11 @@ public class Connection extends Connecti return createSession(Strings.toUTF8(name), expiry); } + public Session createSession(String name, long expiry,boolean isNoReplay) + { + return createSession(new Binary(Strings.toUTF8(name)), expiry, isNoReplay); + } + public Session createSession(byte[] name, long expiry) { return createSession(new Binary(name), expiry); @@ -316,6 +326,11 @@ public class Connection extends Connecti public Session createSession(Binary name, long expiry) { + return createSession(name, expiry, false); + } + + public Session createSession(Binary name, long expiry, boolean isNoReplay) + { synchronized (lock) { Waiter w = new Waiter(lock, timeout); @@ -329,7 +344,7 @@ public class Connection extends Connecti throw new ConnectionException("Timed out waiting for connection to be ready. Current state is :" + state); } - Session ssn = _sessionFactory.newSession(this, name, expiry); + Session ssn = _sessionFactory.newSession(this, name, expiry, isNoReplay); registerSession(ssn); map(ssn); ssn.attach(); Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=1438725&r1=1438724&r2=1438725&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Fri Jan 25 21:32:42 2013 @@ -131,19 +131,31 @@ public class Session extends SessionInvo private final Object stateLock = new Object(); private final AtomicBoolean _failoverRequired = new AtomicBoolean(false); + private boolean _isNoReplay = false; protected Session(Connection connection, Binary name, long expiry) { this(connection, new SessionDelegate(), name, expiry); } + protected Session(Connection connection, Binary name, long expiry, boolean noReplay) + { + this(connection, new SessionDelegate(), name, expiry, noReplay); + } + protected Session(Connection connection, SessionDelegate delegate, Binary name, long expiry) { + this(connection, delegate, name, expiry,false); + } + + protected Session(Connection connection, SessionDelegate delegate, Binary name, long expiry, boolean noReplay) + { this.connection = connection; this.delegate = delegate; this.name = name; this.expiry = expiry; this.closing = false; + this._isNoReplay = noReplay; initReceiver(); } @@ -281,6 +293,7 @@ public class Session extends SessionInvo void resume() { _failoverRequired.set(false); + synchronized (commandsLock) { attach(); @@ -739,7 +752,7 @@ public class Session extends SessionInvo sessionCommandPoint(0, 0); } - boolean replayTransfer = !closing && !transacted && + boolean replayTransfer = !_isNoReplay && !closing && !transacted && m instanceof MessageTransfer && ! m.isUnreliable(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org