Repository: activemq Updated Branches: refs/heads/master 078a101cf -> 5d353e241
https://issues.apache.org/jira/browse/AMQ-6037 Add support for amqp style variants of the ActiveMQ job scheduler options set in message annotations. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5d353e24 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5d353e24 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5d353e24 Branch: refs/heads/master Commit: 5d353e241b0ba76ad1a0b42ef5c7a2ae54106860 Parents: 078a101 Author: Timothy Bish <[email protected]> Authored: Tue Nov 10 18:12:40 2015 -0500 Committer: Timothy Bish <[email protected]> Committed: Tue Nov 10 18:12:40 2015 -0500 ---------------------------------------------------------------------- .../amqp/message/InboundTransformer.java | 23 ++++- .../amqp/interop/AmqpScheduledMessageTest.java | 91 +++++++++++++++++++- .../src/test/resources/log4j.properties | 1 + 3 files changed, 112 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/5d353e24/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java index a824cfb..c3dc1d3 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java @@ -137,13 +137,32 @@ public abstract class InboundTransformer { if ("x-opt-jms-type".equals(key) && entry.getValue() != null) { // Legacy annotation, JMSType value will be replaced by Subject further down if also present. jms.setJMSType(entry.getValue().toString()); - } - if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) { + } else if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) { long deliveryTime = ((Number) entry.getValue()).longValue(); long delay = deliveryTime - System.currentTimeMillis(); if (delay > 0) { jms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); } + } else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) { + long delay = ((Number) entry.getValue()).longValue(); + if (delay > 0) { + jms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); + } + } else if ("x-opt-delivery-repeat".equals(key) && entry.getValue() != null) { + int repeat = ((Number) entry.getValue()).intValue(); + if (repeat > 0) { + jms.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat); + } + } else if ("x-opt-delivery-period".equals(key) && entry.getValue() != null) { + long period = ((Number) entry.getValue()).longValue(); + if (period > 0) { + jms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period); + } + } else if ("x-opt-delivery-cron".equals(key) && entry.getValue() != null) { + String cronEntry = (String) entry.getValue(); + if (cronEntry != null) { + jms.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, cronEntry); + } } setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue()); http://git-wip-us.apache.org/repos/asf/activemq/blob/5d353e24/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java index cbe3598..14f4752 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java @@ -19,6 +19,7 @@ package org.apache.activemq.transport.amqp.interop; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.util.concurrent.TimeUnit; @@ -148,13 +149,100 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport { sender.send(message); sender.close(); + // Read the message with short timeout, shouldn't get it. + try { + readMessages(getTestName(), 1, false, 500); + fail("Should not read the message"); + } catch (Throwable ex) { + } + // Read the message readMessages(getTestName(), 1, false); connection.close(); } + @Test + public void testScheduleWithDelay() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + assertEquals(0, brokerService.getAdminView().getQueues().length); + + AmqpSender sender = session.createSender("queue://" + getTestName()); + + // Get the Queue View early to avoid racing the delivery. + assertEquals(1, brokerService.getAdminView().getQueues().length); + final QueueViewMBean queueView = getProxyToQueue(getTestName()); + assertNotNull(queueView); + + AmqpMessage message = new AmqpMessage(); + long delay = 5000; + message.setMessageAnnotation("x-opt-delivery-delay", delay); + message.setText("Test-Message"); + sender.send(message); + sender.close(); + + // Read the message with short timeout, shouldn't get it. + try { + readMessages(getTestName(), 1, false, 1000); + fail("Should not read the message"); + } catch (Throwable ex) { + } + + // Read the message with long timeout, should get it. + try { + readMessages(getTestName(), 1, false, 10000); + } catch (Throwable ex) { + fail("Should read the message"); + } + + connection.close(); + } + + @Test + public void testScheduleRepeated() throws Exception { + final int NUMBER = 10; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + assertEquals(0, brokerService.getAdminView().getQueues().length); + + AmqpSender sender = session.createSender("queue://" + getTestName()); + + // Get the Queue View early to avoid racing the delivery. + assertEquals(1, brokerService.getAdminView().getQueues().length); + final QueueViewMBean queueView = getProxyToQueue(getTestName()); + assertNotNull(queueView); + + AmqpMessage message = new AmqpMessage(); + long delay = 1000; + message.setMessageAnnotation("x-opt-delivery-delay", delay); + message.setMessageAnnotation("x-opt-delivery-period", 500); + message.setMessageAnnotation("x-opt-delivery-repeat", NUMBER - 1); + message.setText("Test-Message"); + sender.send(message); + sender.close(); + + readMessages(getTestName(), NUMBER, false); + // Read the message with short timeout, shouldn't get it. + try { + readMessages(getTestName(), 1, false, 600); + fail("Should not read more messages"); + } catch (Throwable ex) { + } + + connection.close(); + } + public void readMessages(String destinationName, int count, boolean topic) throws Exception { + readMessages(destinationName, count, topic, 5000); + } + + public void readMessages(String destinationName, int count, boolean topic, long timeout) throws Exception { Connection connection = createJMSConnection(); connection.start(); @@ -169,8 +257,9 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport { MessageConsumer consumer = session.createConsumer(destination); for (int i = 1; i <= count; i++) { - Message received = consumer.receive(5000); + Message received = consumer.receive(timeout); assertNotNull(received); + LOG.info("Read next message: {}", received.getJMSMessageID()); } } finally { connection.close(); http://git-wip-us.apache.org/repos/asf/activemq/blob/5d353e24/activemq-amqp/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/resources/log4j.properties b/activemq-amqp/src/test/resources/log4j.properties index 63c4701..4111e16 100755 --- a/activemq-amqp/src/test/resources/log4j.properties +++ b/activemq-amqp/src/test/resources/log4j.properties @@ -20,6 +20,7 @@ # log4j.rootLogger=WARN, console, file log4j.logger.org.apache.activemq=INFO +log4j.logger.org.apache.activemq.broker.scheduler=TRACE log4j.logger.org.apache.activemq.transport.amqp=DEBUG log4j.logger.org.apache.activemq.transport.amqp.FRAMES=INFO log4j.logger.org.fusesource=INFO
