QPIDJMS-207 Adds support for the JMS 2.0 Delayed Delivery feature Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/6e442f4c Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/6e442f4c Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/6e442f4c
Branch: refs/heads/master Commit: 6e442f4c6aa1401a14031c6f2f05d7edbd58037c Parents: 0c39522 Author: Timothy Bish <tabish...@gmail.com> Authored: Mon Sep 12 15:20:25 2016 -0400 Committer: Timothy Bish <tabish...@gmail.com> Committed: Mon Sep 12 15:20:25 2016 -0400 ---------------------------------------------------------------------- .../message/JmsMessagePropertyIntercepter.java | 32 +++++ .../qpid/jms/message/JmsMessageSupport.java | 1 + .../provider/amqp/AmqpConnectionProperties.java | 23 ++++ .../jms/provider/amqp/AmqpFixedProducer.java | 13 +- .../qpid/jms/provider/amqp/AmqpSupport.java | 1 + .../amqp/message/AmqpJmsMessageFacade.java | 16 ++- .../amqp/message/AmqpMessageSupport.java | 5 + .../integration/ProducerIntegrationTest.java | 82 +++++++++++++ .../JmsMessagePropertyIntercepterTest.java | 110 +++++++++++++++++ .../amqp/message/AmqpJmsMessageFacadeTest.java | 10 ++ .../transports/netty/NettySimpleAmqpServer.java | 123 ++++++++++++++++++- 11 files changed, 407 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java index bb2eb0b..65c8c9a 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java @@ -24,6 +24,7 @@ import static org.apache.qpid.jms.message.JmsMessageSupport.JMSX_GROUPSEQ; import static org.apache.qpid.jms.message.JmsMessageSupport.JMSX_USERID; import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_AMQP_ACK_TYPE; import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_CORRELATIONID; +import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_DELIVERYTIME; import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_DELIVERY_MODE; import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_DESTINATION; import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_EXPIRATION; @@ -135,6 +136,7 @@ public class JmsMessagePropertyIntercepter { STANDARD_HEADERS.add(JMS_TYPE); STANDARD_HEADERS.add(JMS_EXPIRATION); STANDARD_HEADERS.add(JMS_PRIORITY); + STANDARD_HEADERS.add(JMS_DELIVERYTIME); VENDOR_PROPERTIES.add(JMS_AMQP_ACK_TYPE); @@ -638,6 +640,36 @@ public class JmsMessagePropertyIntercepter { return true; } }); + PROPERTY_INTERCEPTERS.put(JMS_DELIVERYTIME, new PropertyIntercepter() { + @Override + public Object getProperty(JmsMessage message) throws JMSException { + return Long.valueOf(message.getFacade().getDeliveryTime()); + } + + @Override + public void setProperty(JmsMessage message, Object value) throws JMSException { + Long rc = (Long) TypeConversionSupport.convert(value, Long.class); + if (rc == null) { + throw new JMSException("Property JMSDeliveryTime cannot be set from a " + value.getClass().getName() + "."); + } + message.getFacade().setDeliveryTime(rc.longValue()); + } + + @Override + public boolean propertyExists(JmsMessage message) { + return message.getFacade().getDeliveryTime() > 0; + } + + @Override + public void clearProperty(JmsMessage message) { + message.getFacade().setDeliveryTime(0); + } + + @Override + public boolean isAlwaysWritable() { + return false; + } + }); } /** http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java index c3ce451..657542c 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java @@ -31,6 +31,7 @@ public class JmsMessageSupport { public static final String JMS_CORRELATIONID = "JMSCorrelationID"; public static final String JMS_EXPIRATION = "JMSExpiration"; public static final String JMS_REDELIVERED = "JMSRedelivered"; + public static final String JMS_DELIVERYTIME = "JMSDeliveryTime"; public static final String JMSX_GROUPID = "JMSXGroupID"; public static final String JMSX_GROUPSEQ = "JMSXGroupSeq"; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java index 79a0d95..c090853 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java @@ -18,6 +18,7 @@ package org.apache.qpid.jms.provider.amqp; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.ANONYMOUS_RELAY; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.CONNECTION_OPEN_FAILED; +import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DELAYED_DELIVERY; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.QUEUE_PREFIX; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.TOPIC_PREFIX; @@ -41,6 +42,7 @@ public class AmqpConnectionProperties { private final JmsConnectionInfo connectionInfo; + private boolean delayedDeliverySupported = false; private boolean anonymousRelaySupported = false; private boolean connectionOpenFailed = false; @@ -78,6 +80,10 @@ public class AmqpConnectionProperties { if (list.contains(ANONYMOUS_RELAY)) { anonymousRelaySupported = true; } + + if (list.contains(DELAYED_DELIVERY)) { + delayedDeliverySupported = true; + } } protected void processProperties(Map<Symbol, Object> properties) { @@ -104,6 +110,23 @@ public class AmqpConnectionProperties { } /** + * @return true if the connection supports sending message with delivery delays. + */ + public boolean isDelayedDeliverySupported() { + return delayedDeliverySupported; + } + + /** + * Sets if the connection supports sending message with assigned delivery delays. + * + * @param deliveryDelaySupported + * true if the delivery delay features is supported. + */ + public void setDeliveryDelaySupported(boolean deliveryDelaySupported) { + this.delayedDeliverySupported = deliveryDelaySupported; + } + + /** * @return true if the connection supports sending to an anonymous relay. */ public boolean isAnonymousRelaySupported() { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java index 9233ce1..df39b78 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java @@ -68,12 +68,16 @@ public class AmqpFixedProducer extends AmqpProducer { private AsyncResult sendCompletionWatcher; + private final AmqpConnection connection; + public AmqpFixedProducer(AmqpSession session, JmsProducerInfo info) { - super(session, info); + this(session, info, null); } public AmqpFixedProducer(AmqpSession session, JmsProducerInfo info, Sender sender) { super(session, info, sender); + + connection = session.getConnection(); } @Override @@ -93,7 +97,12 @@ public class AmqpFixedProducer extends AmqpProducer { request.onFailure(new IllegalStateException("The MessageProducer is closed")); } - if (getEndpoint().getCredit() <= 0) { + if (!connection.getProperties().isDelayedDeliverySupported() && + envelope.getMessage().getJMSDeliveryTime() != 0) { + + // Don't allow sends with delay if the remote said it can't handle them + request.onFailure(new JMSException("Remote does not support delayed message delivery")); + } else if (getEndpoint().getCredit() <= 0) { LOG.trace("Holding Message send until credit is available."); InFlightSend send = new InFlightSend(envelope, request); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java index 10ae94f..9738d68 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java @@ -43,6 +43,7 @@ public class AmqpSupport { // Symbols used for connection capabilities public static final Symbol SOLE_CONNECTION_CAPABILITY = Symbol.valueOf("sole-connection-for-container"); public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY"); + public static final Symbol DELAYED_DELIVERY = Symbol.valueOf("DELAYED_DELIVERY"); // Symbols used to announce connection error information public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed"); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java index 82f63e0..89094a1 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java @@ -17,6 +17,7 @@ package org.apache.qpid.jms.provider.amqp.message; import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_TTL; +import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_DELIVERY_TIME; import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MESSAGE; import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MSG_TYPE; @@ -555,14 +556,21 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade { @Override public long getDeliveryTime() { - // TODO Auto-generated method stub - return 0; + Object deliveryTime = getMessageAnnotation(JMS_DELIVERY_TIME); + if (deliveryTime != null) { + return (long) deliveryTime; + } + + return 0l; } @Override public void setDeliveryTime(long deliveryTime) { - // TODO Auto-generated method stub - + if (deliveryTime != 0) { + setMessageAnnotation(JMS_DELIVERY_TIME, deliveryTime); + } else { + removeMessageAnnotation(JMS_DELIVERY_TIME); + } } /** http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java index 271481b..40987e1 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java @@ -37,6 +37,11 @@ public final class AmqpMessageSupport { public static final String JMS_MSG_TYPE = "x-opt-jms-msg-type"; /** + * Attribute used to mark the Application defined delivery time assigned to the message + */ + public static final String JMS_DELIVERY_TIME = "x-opt-delivery-time"; + + /** * Value mapping for JMS_MSG_TYPE which indicates the message is a generic JMS Message * which has no body. */ http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java index 0dee795..0e6b445 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java @@ -18,11 +18,13 @@ */ package org.apache.qpid.jms.integration; +import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DELAYED_DELIVERY; import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.isA; import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -86,6 +88,7 @@ import org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSect import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher; import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher; import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.UnsignedByte; import org.apache.qpid.proton.amqp.UnsignedInteger; import org.hamcrest.Matcher; @@ -1834,6 +1837,85 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { } @Test(timeout = 20000) + public void testSendFailsWhenDelayedDeliveryIsNotSupported() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + + // DO NOT add capability to indicate server support for DELAYED-DELIVERY + + Connection connection = testFixture.establishConnecton(testPeer); + + connection.start(); + + testPeer.expectBegin(); + testPeer.expectSenderAttach(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + Topic dest = session.createTopic(topicName); + + MessageProducer producer = session.createProducer(dest); + producer.setDeliveryDelay(5000); + + // Producer should fail to send when message has delivery delay since remote + // did not report that it supports that option. + Message message = session.createMessage(); + try { + producer.send(message); + fail("Send should fail"); + } catch (JMSException jmsEx) { + LOG.debug("Caught expected error from failed send."); + } + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) + public void testSendWorksWhenDelayedDeliveryIsSupported() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + + String topicName = "myTopic"; + + // DO add capability to indicate server support for DELAYED-DELIVERY + + Connection connection = testFixture.establishConnecton(testPeer, new Symbol[]{ DELAYED_DELIVERY }); + + connection.start(); + + testPeer.expectBegin(); + testPeer.expectSenderAttach(); + + MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true)); + MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); + Symbol annotationKey = AmqpMessageSupport.getSymbol(AmqpMessageSupport.JMS_DELIVERY_TIME); + msgAnnotationsMatcher.withEntry(annotationKey, notNullValue()); + + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(headersMatcher); + messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); + + testPeer.expectTransfer(messageMatcher); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Topic dest = session.createTopic(topicName); + + MessageProducer producer = session.createProducer(dest); + producer.setDeliveryDelay(5000); + producer.send(session.createMessage()); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) public void testAsyncCompletionAfterSendMessageGetDispoation() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepterTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepterTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepterTest.java index 75a1a78..055602d 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepterTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepterTest.java @@ -22,6 +22,7 @@ import static org.apache.qpid.jms.message.JmsMessageSupport.JMSX_GROUPSEQ; import static org.apache.qpid.jms.message.JmsMessageSupport.JMSX_USERID; import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_AMQP_ACK_TYPE; import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_CORRELATIONID; +import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_DELIVERYTIME; import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_DELIVERY_MODE; import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_DESTINATION; import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_EXPIRATION; @@ -1778,4 +1779,113 @@ public class JmsMessagePropertyIntercepterTest { JmsMessagePropertyIntercepter.clearProperties(message, true); assertFalse(JmsMessagePropertyIntercepter.propertyExists(message, JMS_AMQP_ACK_TYPE)); } + + //---------- JMSDeliveryTime ---------------------------------------------// + + @Test + public void testJMSDeliveryTimeInGetAllPropertyNames() throws JMSException { + JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class); + JmsMessage message = Mockito.mock(JmsMapMessage.class); + Mockito.when(message.getFacade()).thenReturn(facade); + assertTrue(JmsMessagePropertyIntercepter.getAllPropertyNames(message).contains(JMS_DELIVERYTIME)); + } + + @Test + public void testGetJMSDeliveryWhenNotSet() throws JMSException { + JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class); + JmsMessage message = Mockito.mock(JmsMapMessage.class); + Mockito.when(message.getFacade()).thenReturn(facade); + Mockito.when(facade.getDeliveryTime()).thenReturn(0L); + assertEquals(Long.valueOf(0L), JmsMessagePropertyIntercepter.getProperty(message, JMS_DELIVERYTIME)); + Mockito.verify(facade).getDeliveryTime(); + } + + @Test + public void testGetJMSDeliveryTimeWhenSet() throws JMSException { + JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class); + JmsMessage message = Mockito.mock(JmsMapMessage.class); + Mockito.when(message.getFacade()).thenReturn(facade); + Mockito.when(facade.getDeliveryTime()).thenReturn(900L); + assertEquals(900L, JmsMessagePropertyIntercepter.getProperty(message, JMS_DELIVERYTIME)); + } + + @Test + public void testSetJMSDeliveryTime() throws JMSException { + JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class); + JmsMessage message = Mockito.mock(JmsMapMessage.class); + Mockito.when(message.getFacade()).thenReturn(facade); + JmsMessagePropertyIntercepter.setProperty(message, JMS_DELIVERYTIME, 65536L); + Mockito.verify(facade).setDeliveryTime(65536L); + } + + @Test + public void testJMSDeliveryTimeInGetPropertyNamesWhenSet() throws JMSException { + doJMSDeliveryTimeInGetPropertyNamesWhenSetTestImpl(false); + } + + @Test + public void testJMSDeliveryTimeNotInGetPropertyNamesWhenSetAndExcludingStandardJMSHeaders() throws JMSException { + doJMSDeliveryTimeInGetPropertyNamesWhenSetTestImpl(true); + } + + private void doJMSDeliveryTimeInGetPropertyNamesWhenSetTestImpl(boolean excludeStandardJmsHeaders) throws JMSException { + JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class); + JmsMessage message = Mockito.mock(JmsMapMessage.class); + Mockito.when(message.getFacade()).thenReturn(facade); + Mockito.when(facade.getDeliveryTime()).thenReturn(900L); + if (excludeStandardJmsHeaders) { + assertFalse(JmsMessagePropertyIntercepter.getPropertyNames(message, true).contains(JMS_DELIVERYTIME)); + } else { + assertTrue(JmsMessagePropertyIntercepter.getPropertyNames(message, false).contains(JMS_DELIVERYTIME)); + } + } + + @Test + public void testJMSDeliveryTimeNotInGetPropertyNamesWhenNotSet() throws JMSException { + JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class); + JmsMessage message = Mockito.mock(JmsMapMessage.class); + Mockito.when(message.getFacade()).thenReturn(facade); + assertFalse(JmsMessagePropertyIntercepter.getPropertyNames(message, false).contains(JMS_DELIVERYTIME)); + } + + @Test + public void testJMSDeliveryTimePropertExistsWhenSet() throws JMSException { + JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class); + JmsMessage message = Mockito.mock(JmsMapMessage.class); + Mockito.when(message.getFacade()).thenReturn(facade); + Mockito.when(facade.getDeliveryTime()).thenReturn(900L); + assertTrue(JmsMessagePropertyIntercepter.propertyExists(message, JMS_DELIVERYTIME)); + } + + @Test + public void testJMSDeliveryTimePropertExistsWhenNotSet() throws JMSException { + JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class); + JmsMessage message = Mockito.mock(JmsMapMessage.class); + Mockito.when(message.getFacade()).thenReturn(facade); + Mockito.when(facade.getDeliveryTime()).thenReturn(0L); + assertFalse(JmsMessagePropertyIntercepter.propertyExists(message, JMS_DELIVERYTIME)); + } + + @Test + public void testSetJMSDeliveryTimeConversionChecks() throws JMSException { + JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class); + JmsMessage message = Mockito.mock(JmsMapMessage.class); + Mockito.when(message.getFacade()).thenReturn(facade); + try { + JmsMessagePropertyIntercepter.setProperty(message, JMS_DELIVERYTIME, new byte[1]); + fail("Should have thrown an exception for this call"); + } catch (JMSException e) { + } + } + + @Test + public void testJMSDeliveryTimeClearedWhenRequested() throws JMSException { + JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class); + JmsMessage message = Mockito.mock(JmsMapMessage.class); + Mockito.when(message.getFacade()).thenReturn(facade); + JmsMessagePropertyIntercepter.clearProperties(message, true); + Mockito.verify(facade, Mockito.never()).setDeliveryTime(0); + JmsMessagePropertyIntercepter.clearProperties(message, false); + Mockito.verify(facade).setDeliveryTime(0); + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java index e430f87..bd50a67 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java @@ -1518,6 +1518,16 @@ public class AmqpJmsMessageFacadeTest extends AmqpJmsMessageTypesTestCase { } @Test + public void testNewMessageDoesNotHaveUnderlyingMessageAnnotationsSectionWithDeliveryTime() { + AmqpJmsMessageFacade amqpMessageFacade = createNewMessageFacade();; + + Message underlying = amqpMessageFacade.getAmqpMessage(); + assertNotNull(underlying.getMessageAnnotations()); + Symbol annotationKey = AmqpMessageSupport.getSymbol(AmqpMessageSupport.JMS_DELIVERY_TIME); + assertNull(underlying.getMessageAnnotations().getValue().get(annotationKey)); + } + + @Test public void testMessageAnnotationExistsUsingReceivedMessageWithoutMessageAnnotationsSection() { String symbolKeyName = "myTestSymbolName"; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySimpleAmqpServer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySimpleAmqpServer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySimpleAmqpServer.java index 7c23d86..1525144 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySimpleAmqpServer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySimpleAmqpServer.java @@ -19,6 +19,7 @@ package org.apache.qpid.jms.transports.netty; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.ANONYMOUS_RELAY; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.CONNECTION_OPEN_FAILED; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.CONTAINER_ID; +import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DELAYED_DELIVERY; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.INVALID_FIELD; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.PLATFORM; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.PRODUCT; @@ -32,8 +33,13 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.qpid.jms.transports.TransportOptions; +import org.apache.qpid.jms.util.IdGenerator; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transport.AmqpError; @@ -42,7 +48,9 @@ import org.apache.qpid.proton.engine.Collector; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Sasl; +import org.apache.qpid.proton.engine.Sender; import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.engine.Transport; import org.apache.qpid.proton.engine.impl.CollectorImpl; @@ -60,15 +68,19 @@ import io.netty.channel.SimpleChannelInboundHandler; * Simple Netty based server that can handle a small subset of AMQP events * using Proton-J as the protocol engine. */ +@SuppressWarnings( "unused" ) public class NettySimpleAmqpServer extends NettyServer { private static final Logger LOG = LoggerFactory.getLogger(NettySimpleAmqpServer.class); + private static final AtomicInteger SERVER_SEQUENCE = new AtomicInteger(); + private static final int CHANNEL_MAX = 32767; private static final int HEADER_SIZE = 8; private static final int SASL_PROTOCOL = 3; private final Map<String, List<Connection>> connections = new HashMap<String, List<Connection>>(); + private final ScheduledExecutorService serializer; private boolean allowNonSaslConnections; private ConnectionIntercepter connectionIntercepter; @@ -83,6 +95,18 @@ public class NettySimpleAmqpServer extends NettyServer { public NettySimpleAmqpServer(TransportOptions options, boolean needClientAuth, boolean webSocketServer) { super(options, needClientAuth, webSocketServer); + + this.serializer = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + + @Override + public Thread newThread(Runnable runner) { + Thread serial = new Thread(runner); + serial.setDaemon(true); + serial.setName(NettySimpleAmqpServer.this.getClass().getSimpleName() + ":(" + + SERVER_SEQUENCE.incrementAndGet() + "):Worker"); + return serial; + } + }); } @Override @@ -111,11 +135,15 @@ public class NettySimpleAmqpServer extends NettyServer { private final class ProtonConnection extends SimpleChannelInboundHandler<ByteBuf> { + private final IdGenerator sessionIdGenerator = new IdGenerator(); + private final Transport protonTransport = Proton.transport(); private final Connection protonConnection = Proton.connection(); private final Collector eventCollector = new CollectorImpl(); private SaslAuthenticator authenticator; + private final Map<String, ProtonSession> sessions = new HashMap<String, ProtonSession>(); + private boolean exclusiveContainerId; private boolean headerRead; private final ByteBuf headerBuf = Unpooled.buffer(HEADER_SIZE, HEADER_SIZE); @@ -223,6 +251,21 @@ public class NettySimpleAmqpServer extends NettyServer { case SESSION_REMOTE_CLOSE: processSessionClose(event.getSession()); break; + case LINK_REMOTE_OPEN: + //processLinkOpen(event.getLink()); + break; + case LINK_REMOTE_DETACH: + //processLinkDetach(event.getLink()); + break; + case LINK_REMOTE_CLOSE: + //processLinkClose(event.getLink()); + break; + case LINK_FLOW: + //processLinkFlow(event.getLink()); + break; + case DELIVERY: + //processDelivery(event.getDelivery()); + break; default: break; } @@ -263,11 +306,17 @@ public class NettySimpleAmqpServer extends NettyServer { } private void processSessionClose(Session session) { + ProtonSession protonSession = (ProtonSession) session.getContext(); + + sessions.remove(protonSession.getId()); + session.close(); session.free(); } private void processSessionOpen(Session session) { + ProtonSession protonSession = new ProtonSession(sessionIdGenerator.generateId(), session); + sessions.put(protonSession.getId(), protonSession); session.open(); } @@ -387,7 +436,7 @@ public class NettySimpleAmqpServer extends NettyServer { } private Symbol[] getConnectionCapabilitiesOffered() { - return new Symbol[]{ ANONYMOUS_RELAY }; + return new Symbol[]{ ANONYMOUS_RELAY, DELAYED_DELIVERY }; } private Map<Symbol, Object> getConnetionProperties() { @@ -453,7 +502,74 @@ public class NettySimpleAmqpServer extends NettyServer { } - //----- Internal Type Implementations ------------------------------------// + //----- Session Manager --------------------------------------------------// + + private class ProtonSession { + + private final String sessionId; + private final Session session; + + private Map<String, ProtonSender> senders = new HashMap<String, ProtonSender>(); + private Map<String, ProtonReceiver> receivers = new HashMap<String, ProtonReceiver>(); + + public ProtonSession(String sessionId, Session session) { + this.sessionId = sessionId; + this.session = session; + this.session.setContext(this); + } + + public Session getSession() { + return session; + } + + public String getId() { + return sessionId; + } + } + + //----- Sender Manager ---------------------------------------------------// + + private class ProtonSender { + + private final String senderId; + private final Sender sender; + + public ProtonSender(String senderId, Sender sender) { + this.senderId = senderId; + this.sender = sender; + } + + public String getId() { + return senderId; + } + + public Sender getSender() { + return sender; + } + } + + //----- Receiver Manager ---------------------------------------------------// + + private class ProtonReceiver { + + private final String receiverId; + private final Receiver receiver; + + public ProtonReceiver(String receiverId, Receiver receiver) { + this.receiverId = receiverId; + this.receiver = receiver; + } + + public String getId() { + return receiverId; + } + + public Receiver getReceiver() { + return receiver; + } + } + + //----- SASL Authentication Manager --------------------------------------// private class SaslAuthenticator { @@ -496,7 +612,8 @@ public class NettySimpleAmqpServer extends NettyServer { } } - @SuppressWarnings("unused") + //----- Simple AMQP Header Wrapper ---------------------------------------// + private class AmqpHeader { private final byte[] PREFIX = new byte[] { 'A', 'M', 'Q', 'P' }; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org