Repository: qpid-jms Updated Branches: refs/heads/master 6b6e1a76c -> 7e2c5702d
QPIDJMS-177 Ensure that sessions snapshot the policy objects they use on creation. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/7e2c5702 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/7e2c5702 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/7e2c5702 Branch: refs/heads/master Commit: 7e2c5702d448b2517cdfc6bfb3ac3e2101299076 Parents: 6b6e1a7 Author: Timothy Bish <[email protected]> Authored: Tue May 24 17:05:00 2016 -0400 Committer: Timothy Bish <[email protected]> Committed: Tue May 24 17:05:00 2016 -0400 ---------------------------------------------------------------------- .../org/apache/qpid/jms/JmsMessageConsumer.java | 4 +- .../org/apache/qpid/jms/JmsMessageProducer.java | 2 +- .../java/org/apache/qpid/jms/JmsSession.java | 24 +++++++---- .../apache/qpid/jms/meta/JmsSessionInfo.java | 42 ++++++++++++++++++++ .../jms/integration/SessionIntegrationTest.java | 22 ++++++++++ 5 files changed, 84 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7e2c5702/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java index fd74ffe..d797918 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java @@ -86,8 +86,8 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe this.messageQueue = new FifoMessageQueue(); } - JmsPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy(); - JmsRedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy().copy(); + JmsPrefetchPolicy prefetchPolicy = session.getPrefetchPolicy(); + JmsRedeliveryPolicy redeliveryPolicy = session.getRedeliveryPolicy().copy(); consumerInfo = new JmsConsumerInfo(consumerId); consumerInfo.setClientId(connection.getClientID()); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7e2c5702/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java index 89165e4..e040068 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java @@ -58,7 +58,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer { this.anonymousProducer = destination == null; JmsMessageIDBuilder messageIDBuilder = - session.getConnection().getMessageIDPolicy().getMessageIDBuilder(session, destination); + session.getMessageIDPolicy().getMessageIDBuilder(session, destination); this.producerInfo = new JmsProducerInfo(producerId, messageIDBuilder); this.producerInfo.setDestination(destination); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7e2c5702/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java index b2f87ed..7b3d20f 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java @@ -68,8 +68,10 @@ import org.apache.qpid.jms.meta.JmsProducerId; import org.apache.qpid.jms.meta.JmsProducerInfo; import org.apache.qpid.jms.meta.JmsSessionId; import org.apache.qpid.jms.meta.JmsSessionInfo; +import org.apache.qpid.jms.policy.JmsMessageIDPolicy; import org.apache.qpid.jms.policy.JmsPrefetchPolicy; import org.apache.qpid.jms.policy.JmsPresettlePolicy; +import org.apache.qpid.jms.policy.JmsRedeliveryPolicy; import org.apache.qpid.jms.provider.Provider; import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; import org.apache.qpid.jms.provider.ProviderFuture; @@ -94,8 +96,6 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe private final AtomicBoolean started = new AtomicBoolean(); private final LinkedBlockingQueue<JmsInboundMessageDispatch> stoppedMessages = new LinkedBlockingQueue<JmsInboundMessageDispatch>(10000); - private final JmsPrefetchPolicy prefetchPolicy; - private final JmsPresettlePolicy presettlePolicy; private final JmsSessionInfo sessionInfo; private volatile ExecutorService executor; private final ReentrantLock sendLock = new ReentrantLock(); @@ -109,8 +109,6 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe protected JmsSession(JmsConnection connection, JmsSessionId sessionId, int acknowledgementMode) throws JMSException { this.connection = connection; this.acknowledgementMode = acknowledgementMode; - this.prefetchPolicy = connection.getPrefetchPolicy().copy(); - this.presettlePolicy = connection.getPresettlePolicy().copy(); if (acknowledgementMode == SESSION_TRANSACTED) { setTransactionContext(new JmsLocalTransactionContext(this)); @@ -121,6 +119,10 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe sessionInfo = new JmsSessionInfo(sessionId); sessionInfo.setAcknowledgementMode(acknowledgementMode); sessionInfo.setSendAcksAsync(connection.isForceAsyncAcks()); + sessionInfo.setMessageIDPolicy(connection.getMessageIDPolicy().copy()); + sessionInfo.setPrefetchPolicy(connection.getPrefetchPolicy().copy()); + sessionInfo.setPresettlePolicy(connection.getPresettlePolicy().copy()); + sessionInfo.setRedeliveryPolicy(connection.getRedeliveryPolicy().copy()); connection.createResource(sessionInfo); @@ -710,7 +712,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe envelope.setDispatchId(messageSequence); if (producer.isAnonymous()) { - envelope.setPresettle(presettlePolicy.isProducerPresttled(this, destination)); + envelope.setPresettle(getPresettlePolicy().isProducerPresttled(this, destination)); } transactionContext.send(connection, envelope); @@ -923,12 +925,20 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe } } + public JmsMessageIDPolicy getMessageIDPolicy() { + return sessionInfo.getMessageIDPolicy(); + } + public JmsPrefetchPolicy getPrefetchPolicy() { - return prefetchPolicy; + return sessionInfo.getPrefetchPolicy(); } public JmsPresettlePolicy getPresettlePolicy() { - return presettlePolicy; + return sessionInfo.getPresettlePolicy(); + } + + public JmsRedeliveryPolicy getRedeliveryPolicy() { + return sessionInfo.getRedeliveryPolicy(); } @Override http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7e2c5702/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java index c1927af..a4733b1 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java @@ -18,11 +18,21 @@ package org.apache.qpid.jms.meta; import javax.jms.Session; +import org.apache.qpid.jms.policy.JmsMessageIDPolicy; +import org.apache.qpid.jms.policy.JmsPrefetchPolicy; +import org.apache.qpid.jms.policy.JmsPresettlePolicy; +import org.apache.qpid.jms.policy.JmsRedeliveryPolicy; + public final class JmsSessionInfo implements JmsResource, Comparable<JmsSessionInfo> { private final JmsSessionId sessionId; + private int acknowledgementMode; private boolean sendAcksAsync; + private JmsMessageIDPolicy messageIDPolicy; + private JmsPrefetchPolicy prefetchPolicy; + private JmsPresettlePolicy presettlePolicy; + private JmsRedeliveryPolicy redeliveryPolicy; public JmsSessionInfo(JmsConnectionInfo connectionInfo, long sessionId) { if (connectionInfo == null) { @@ -110,4 +120,36 @@ public final class JmsSessionInfo implements JmsResource, Comparable<JmsSessionI public int compareTo(JmsSessionInfo other) { return sessionId.compareTo(other.sessionId); } + + public JmsMessageIDPolicy getMessageIDPolicy() { + return messageIDPolicy; + } + + public void setMessageIDPolicy(JmsMessageIDPolicy messageIDPolicy) { + this.messageIDPolicy = messageIDPolicy; + } + + public JmsPrefetchPolicy getPrefetchPolicy() { + return prefetchPolicy; + } + + public void setPrefetchPolicy(JmsPrefetchPolicy prefetchPolicy) { + this.prefetchPolicy = prefetchPolicy; + } + + public JmsPresettlePolicy getPresettlePolicy() { + return presettlePolicy; + } + + public void setPresettlePolicy(JmsPresettlePolicy presettlePolicy) { + this.presettlePolicy = presettlePolicy; + } + + public JmsRedeliveryPolicy getRedeliveryPolicy() { + return redeliveryPolicy; + } + + public void setRedeliveryPolicy(JmsRedeliveryPolicy redeliveryPolicy) { + this.redeliveryPolicy = redeliveryPolicy; + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7e2c5702/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java index fe9b4b4..bca3a57 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java @@ -26,6 +26,7 @@ import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -59,6 +60,7 @@ import javax.jms.TopicSubscriber; import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.JmsDefaultConnectionListener; import org.apache.qpid.jms.JmsOperationTimedOutException; +import org.apache.qpid.jms.JmsSession; import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy; import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper; import org.apache.qpid.jms.test.QpidJmsTestCase; @@ -1541,4 +1543,24 @@ public class SessionIntegrationTest extends QpidJmsTestCase { } } } + + @Test(timeout = 20000) + public void testSessionSnapshotsPolicyObjects() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); + connection.start(); + + testPeer.expectBegin(); + + JmsSession session = (JmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + assertNotSame(session.getMessageIDPolicy(), connection.getMessageIDPolicy()); + assertNotSame(session.getPrefetchPolicy(), connection.getPrefetchPolicy()); + assertNotSame(session.getPresettlePolicy(), connection.getPresettlePolicy()); + assertNotSame(session.getRedeliveryPolicy(), connection.getRedeliveryPolicy()); + + testPeer.expectClose(); + connection.close(); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
