Author: tabish
Date: Mon Oct 4 20:35:18 2010
New Revision: 1004411
URL: http://svn.apache.org/viewvc?rev=1004411&view=rev
Log:
fix for: https://issues.apache.org/activemq/browse/AMQ-2941
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java?rev=1004411&r1=1004410&r2=1004411&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java
Mon Oct 4 20:35:18 2010
@@ -18,7 +18,7 @@ package org.apache.activemq;
public interface ScheduledMessage {
/**
- * The time in milliseconds that a message will wait before being
scheduled to be
+ * The time in milliseconds that a message will wait before being
scheduled to be
* delivered by the broker
*/
public static final String AMQ_SCHEDULED_DELAY = "AMQ_SCHEDULED_DELAY";
@@ -34,6 +34,48 @@ public interface ScheduledMessage {
* Use a Cron tab entry to set the schedule
*/
public static final String AMQ_SCHEDULED_CRON = "AMQ_SCHEDULED_CRON";
-
+ /**
+ * An Id that is assigned to a Scheduled Message, this value is only
available once the
+ * Message is scheduled, Messages sent to the Browse Destination or
delivered to the
+ * assigned Destination will have this value set.
+ */
+ public static final String AMQ_SCHEDULED_ID = "scheduledJobId";
+
+ /**
+ * Special destination to send Message's to with an assigned "action" that
the Scheduler
+ * should perform such as removing a message.
+ */
+ public static final String AMQ_SCHEDULER_MANAGEMENT_DESTINATION =
"ActiveMQ.Scheduler.Management";
+ /**
+ * Used to specify that a some operation should be performed on the
Scheduled Message,
+ * the Message must have an assigned Id for this action to be taken.
+ */
+ public static final String AMQ_SCHEDULER_ACTION = "AMQ_SCHEDULER_ACTION";
+
+ /**
+ * Indicates that a browse of the Scheduled Messages is being requested.
+ */
+ public static final String AMQ_SCHEDULER_ACTION_BROWSE = "BROWSE";
+ /**
+ * Indicates that a Scheduled Message is to be remove from the Scheduler,
the Id of
+ * the scheduled message must be set as a property in order for this
action to have
+ * any effect.
+ */
+ public static final String AMQ_SCHEDULER_ACTION_REMOVE = "REMOVE";
+ /**
+ * Indicates that all scheduled Messages should be removed.
+ */
+ public static final String AMQ_SCHEDULER_ACTION_REMOVEALL = "REMOVEALL";
+
+ /**
+ * A property that holds the beginning of the time interval that the
specified action should
+ * be applied within. Maps to a long value that specified time in
milliseconds since UTC.
+ */
+ public static final String AMQ_SCHEDULER_ACTION_START_TIME =
"ACTION_START_TIME";
+ /**
+ * A property that holds the end of the time interval that the specified
action should be
+ * applied within. Maps to a long value that specified time in
milliseconds since UTC.
+ */
+ public static final String AMQ_SCHEDULER_ACTION_END_TIME =
"ACTION_END_TIME";
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java?rev=1004411&r1=1004410&r2=1004411&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
Mon Oct 4 20:35:18 2010
@@ -18,11 +18,14 @@ package org.apache.activemq.broker.sched
import java.io.File;
import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
@@ -107,31 +110,80 @@ public class SchedulerBroker extends Bro
long period = 0;
int repeat = 0;
String cronEntry = "";
+ String jobId = (String)
messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_ID);
Object cronValue =
messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
Object periodValue =
messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD);
Object delayValue =
messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);
- if (cronValue != null || periodValue != null || delayValue != null) {
+ String physicalName = messageSend.getDestination().getPhysicalName();
+ boolean schedularManage = physicalName.regionMatches(true, 0,
+ ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION,
0,
+
ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION.length());
+
+ if (schedularManage == true) {
+
+ JobScheduler scheduler = getInternalScheduler();
+ ActiveMQDestination replyTo = messageSend.getReplyTo();
+
+ String action = (String)
messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION);
+
+ if (action != null ) {
+
+ Object startTime =
messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME);
+ Object endTime =
messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME);
+
+ if (replyTo != null &&
action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE)) {
+
+ if( startTime != null && endTime != null ) {
+
+ long start = (Long)
TypeConversionSupport.convert(startTime, Long.class);
+ long finish = (Long)
TypeConversionSupport.convert(endTime, Long.class);
+
+ for (Job job :
scheduler.getAllJobs(start, finish)) {
+
sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
+ }
+ } else {
+ for (Job job : scheduler.getAllJobs()) {
+
sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
+ }
+ }
+ }
+ if (jobId != null &&
action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE)) {
+ scheduler.remove(jobId);
+ } else if
(action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL)) {
+
+ if( startTime != null && endTime != null ) {
+
+ long start = (Long)
TypeConversionSupport.convert(startTime, Long.class);
+ long finish = (Long)
TypeConversionSupport.convert(endTime, Long.class);
+
+ scheduler.removeAllJobs(start, finish);
+ } else {
+ scheduler.removeAllJobs();
+ }
+ }
+ }
+
+ } else if ((cronValue != null || periodValue != null || delayValue !=
null) && jobId == null) {
//clear transaction context
Message msg = messageSend.copy();
msg.setTransactionId(null);
org.apache.activemq.util.ByteSequence packet =
wireFormat.marshal(msg);
- if (cronValue != null) {
- cronEntry = cronValue.toString();
- }
- if (periodValue != null) {
- period = (Long) TypeConversionSupport.convert(periodValue,
Long.class);
- }
- if (delayValue != null) {
- delay = (Long) TypeConversionSupport.convert(delayValue,
Long.class);
- }
- Object repeatValue =
msg.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
- if (repeatValue != null) {
- repeat = (Integer)
TypeConversionSupport.convert(repeatValue, Integer.class);
- }
- getInternalScheduler().schedule(msg.getMessageId().toString(),
- new ByteSequence(packet.data, packet.offset,
packet.length),cronEntry, delay, period, repeat);
-
+ if (cronValue != null) {
+ cronEntry = cronValue.toString();
+ }
+ if (periodValue != null) {
+ period = (Long) TypeConversionSupport.convert(periodValue,
Long.class);
+ }
+ if (delayValue != null) {
+ delay = (Long) TypeConversionSupport.convert(delayValue,
Long.class);
+ }
+ Object repeatValue =
msg.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
+ if (repeatValue != null) {
+ repeat = (Integer) TypeConversionSupport.convert(repeatValue,
Integer.class);
+ }
+ getInternalScheduler().schedule(msg.getMessageId().toString(),
+ new ByteSequence(packet.data, packet.offset,
packet.length),cronEntry, delay, period, repeat);
} else {
super.send(producerExchange, messageSend);
@@ -151,14 +203,14 @@ public class SchedulerBroker extends Bro
if (repeatValue != null) {
repeat = (Integer) TypeConversionSupport.convert(repeatValue,
Integer.class);
}
-
- if (repeat != 0 || cronStr != null && cronStr.length() > 0) {
- // create a unique id - the original message could be sent
- // lots of times
- messageSend
- .setMessageId(new MessageId(this.producerId,
this.messageIdGenerator.getNextSequenceId()));
- }
-
+
+ if (repeat != 0 || cronStr != null && cronStr.length() > 0) {
+ // create a unique id - the original message could be sent
+ // lots of times
+ messageSend.setMessageId(
+ new MessageId(this.producerId,
this.messageIdGenerator.getNextSequenceId()));
+ }
+
// Add the jobId as a property
messageSend.setProperty("scheduledJobId", id);
@@ -176,7 +228,6 @@ public class SchedulerBroker extends Bro
} catch (Exception e) {
LOG.error("Failed to send scheduled message " + id, e);
}
-
}
protected synchronized JobScheduler getInternalScheduler() throws
Exception {
@@ -202,4 +253,37 @@ public class SchedulerBroker extends Bro
return null;
}
+ protected void sendScheduledJob(ConnectionContext context, Job job,
ActiveMQDestination replyTo)
+ throws Exception {
+
+ org.apache.activemq.util.ByteSequence packet = new
org.apache.activemq.util.ByteSequence(job.getPayload());
+ try {
+ Message msg = (Message) this.wireFormat.unmarshal(packet);
+ msg.setOriginalTransactionId(null);
+ msg.setPersistent(false);
+ msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
+ msg.setMessageId(new MessageId(this.producerId,
this.messageIdGenerator.getNextSequenceId()));
+ msg.setDestination(replyTo);
+ msg.setResponseRequired(false);
+ msg.setProducerId(this.producerId);
+
+ // Add the jobId as a property
+ msg.setProperty("scheduledJobId", job.getJobId());
+
+ final boolean originalFlowControl =
context.isProducerFlowControl();
+ final ProducerBrokerExchange producerExchange = new
ProducerBrokerExchange();
+ producerExchange.setConnectionContext(context);
+ producerExchange.setMutable(true);
+ producerExchange.setProducerState(new ProducerState(new
ProducerInfo()));
+ try {
+ context.setProducerFlowControl(false);
+ this.next.send(producerExchange, msg);
+ } finally {
+ context.setProducerFlowControl(originalFlowControl);
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to send scheduled message " + job.getJobId(), e);
+ }
+
+ }
}
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java?rev=1004411&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
Mon Oct 4 20:35:18 2010
@@ -0,0 +1,422 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
+
+ private static final transient Log LOG =
LogFactory.getLog(JobSchedulerManagementTest.class);
+
+ public void testRemoveAllScheduled() throws Exception {
+ final int COUNT = 5;
+ Connection connection = createConnection();
+
+ // Setup the scheduled Message
+ scheduleMessage(connection, TimeUnit.SECONDS.toMillis(6), COUNT);
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ // Create the Browse Destination and the Reply To location
+ Destination management =
session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
+
+ // Create the eventual Consumer to receive the scheduled message
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ final CountDownLatch latch = new CountDownLatch(COUNT);
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message message) {
+ latch.countDown();
+ }
+ });
+
+ connection.start();
+
+ // Send the remove request
+ MessageProducer producer = session.createProducer(management);
+ Message request = session.createMessage();
+ request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
+ producer.send(request);
+
+ // Now wait and see if any get delivered, none should.
+ latch.await(10, TimeUnit.SECONDS);
+ assertEquals(latch.getCount(), COUNT);
+ }
+
+ public void testRemoveAllScheduledAtTime() throws Exception {
+ final int COUNT = 3;
+ Connection connection = createConnection();
+
+ // Setup the scheduled Message
+ scheduleMessage(connection, TimeUnit.SECONDS.toMillis(6));
+ scheduleMessage(connection, TimeUnit.SECONDS.toMillis(15));
+ scheduleMessage(connection, TimeUnit.SECONDS.toMillis(20));
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ // Create the Browse Destination and the Reply To location
+ Destination management =
session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
+ Destination browseDest = session.createTemporaryQueue();
+
+ // Create the eventual Consumer to receive the scheduled message
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ final CountDownLatch latch = new CountDownLatch(COUNT);
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message message) {
+ latch.countDown();
+ }
+ });
+
+ // Create the "Browser"
+ MessageConsumer browser = session.createConsumer(browseDest);
+ final CountDownLatch browsedLatch = new CountDownLatch(COUNT);
+ browser.setMessageListener(new MessageListener() {
+ public void onMessage(Message message) {
+ browsedLatch.countDown();
+ LOG.debug("Scheduled Message Browser got Message: " + message);
+ }
+ });
+
+ connection.start();
+
+ long start = System.currentTimeMillis() +
TimeUnit.SECONDS.toMillis(10);
+ long end = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30);
+
+ // Send the remove request
+ MessageProducer producer = session.createProducer(management);
+ Message request = session.createMessage();
+ request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
+
ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
+
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME,
Long.toString(start));
+
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME,
Long.toString(end));
+ producer.send(request);
+
+ // Send the browse request
+ request = session.createMessage();
+ request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE);
+ request.setJMSReplyTo(browseDest);
+ producer.send(request);
+
+ // now see if we got back only the one remaining message.
+ latch.await(10, TimeUnit.SECONDS);
+ assertEquals(2, browsedLatch.getCount());
+
+ // Now wait and see if any get delivered, none should.
+ latch.await(10, TimeUnit.SECONDS);
+ assertEquals(2, latch.getCount());
+ }
+
+ public void testBrowseAllScheduled() throws Exception {
+ final int COUNT = 10;
+ Connection connection = createConnection();
+
+ // Setup the scheduled Message
+ scheduleMessage(connection, TimeUnit.SECONDS.toMillis(9), COUNT);
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ // Create the Browse Destination and the Reply To location
+ Destination requestBrowse =
session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
+ Destination browseDest = session.createTemporaryQueue();
+
+ // Create the eventual Consumer to receive the scheduled message
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ final CountDownLatch latch = new CountDownLatch(COUNT);
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message message) {
+ latch.countDown();
+ }
+ });
+
+ // Create the "Browser"
+ MessageConsumer browser = session.createConsumer(browseDest);
+ final CountDownLatch browsedLatch = new CountDownLatch(COUNT);
+ browser.setMessageListener(new MessageListener() {
+ public void onMessage(Message message) {
+ browsedLatch.countDown();
+ LOG.debug("Scheduled Message Browser got Message: " + message);
+ }
+ });
+
+ connection.start();
+
+ // Send the browse request
+ MessageProducer producer = session.createProducer(requestBrowse);
+ Message request = session.createMessage();
+ request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE);
+ request.setJMSReplyTo(browseDest);
+ producer.send(request);
+
+ // make sure the message isn't delivered early because we browsed it
+ Thread.sleep(2000);
+ assertEquals(latch.getCount(), COUNT);
+
+ // now see if we got all the scheduled messages on the browse
destination.
+ latch.await(10, TimeUnit.SECONDS);
+ assertEquals(browsedLatch.getCount(), 0);
+
+ // now check that they all got delivered
+ latch.await(10, TimeUnit.SECONDS);
+ assertEquals(latch.getCount(), 0);
+ }
+
+ public void testBrowseWindowlScheduled() throws Exception {
+ final int COUNT = 10;
+ Connection connection = createConnection();
+
+ // Setup the scheduled Message
+ scheduleMessage(connection, TimeUnit.SECONDS.toMillis(5));
+ scheduleMessage(connection, TimeUnit.SECONDS.toMillis(10), COUNT);
+ scheduleMessage(connection, TimeUnit.SECONDS.toMillis(20));
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ // Create the Browse Destination and the Reply To location
+ Destination requestBrowse =
session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
+ Destination browseDest = session.createTemporaryQueue();
+
+ // Create the eventual Consumer to receive the scheduled message
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ final CountDownLatch latch = new CountDownLatch(COUNT + 2);
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message message) {
+ latch.countDown();
+ }
+ });
+
+ // Create the "Browser"
+ MessageConsumer browser = session.createConsumer(browseDest);
+ final CountDownLatch browsedLatch = new CountDownLatch(COUNT);
+ browser.setMessageListener(new MessageListener() {
+ public void onMessage(Message message) {
+ browsedLatch.countDown();
+ LOG.debug("Scheduled Message Browser got Message: " + message);
+ }
+ });
+
+ connection.start();
+
+ long start = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(6);
+ long end = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(15);
+
+ // Send the browse request
+ MessageProducer producer = session.createProducer(requestBrowse);
+ Message request = session.createMessage();
+ request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE);
+
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME,
Long.toString(start));
+
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME,
Long.toString(end));
+ request.setJMSReplyTo(browseDest);
+ producer.send(request);
+
+ // make sure the message isn't delivered early because we browsed it
+ Thread.sleep(2000);
+ assertEquals(COUNT + 2, latch.getCount());
+
+ // now see if we got all the scheduled messages on the browse
destination.
+ latch.await(15, TimeUnit.SECONDS);
+ assertEquals(0, browsedLatch.getCount());
+
+ // now see if we got all the scheduled messages on the browse
destination.
+ latch.await(20, TimeUnit.SECONDS);
+ assertEquals(0, latch.getCount());
+ }
+
+ public void testRemoveScheduled() throws Exception {
+ final int COUNT = 10;
+ Connection connection = createConnection();
+
+ // Setup the scheduled Message
+ scheduleMessage(connection, TimeUnit.SECONDS.toMillis(9), COUNT);
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ // Create the Browse Destination and the Reply To location
+ Destination management =
session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
+ Destination browseDest = session.createTemporaryQueue();
+
+ // Create the eventual Consumer to receive the scheduled message
+ MessageConsumer consumer = session.createConsumer(destination);
+ MessageProducer producer = session.createProducer(management);
+
+ final CountDownLatch latch = new CountDownLatch(COUNT);
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message message) {
+ latch.countDown();
+ }
+ });
+
+ // Create the "Browser"
+ Session browseSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer browser = browseSession.createConsumer(browseDest);
+
+ connection.start();
+
+ // Send the browse request
+ Message request = session.createMessage();
+ request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
+
ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE);
+ request.setJMSReplyTo(browseDest);
+ producer.send(request);
+
+ // Browse all the Scheduled Messages.
+ for (int i = 0; i < COUNT; ++i) {
+ Message message = browser.receive(2000);
+ assertNotNull(message);
+
+ try{
+ Message remove = session.createMessage();
+
remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
+
ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE);
+
remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID,
+
message.getStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID));
+ producer.send(remove);
+ } catch(Exception e) {
+ }
+ }
+
+ // now check that they all got removed and are not delivered.
+ latch.await(11, TimeUnit.SECONDS);
+ assertEquals(COUNT, latch.getCount());
+ }
+
+ public void testRemoveNotScheduled() throws Exception {
+ Connection connection = createConnection();
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ // Create the Browse Destination and the Reply To location
+ Destination management =
session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
+
+ MessageProducer producer = session.createProducer(management);
+
+ try{
+
+ // Send the remove request
+ Message remove = session.createMessage();
+
remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
+
ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
+
remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID, new
IdGenerator().generateId());
+ producer.send(remove);
+ } catch(Exception e) {
+ fail("Caught unexpected exception during remove of unscheduled
message.");
+ }
+ }
+
+ public void testBrowseWithSelector() throws Exception {
+ Connection connection = createConnection();
+
+ // Setup the scheduled Message
+ scheduleMessage(connection, TimeUnit.SECONDS.toMillis(9));
+ scheduleMessage(connection, TimeUnit.SECONDS.toMillis(10));
+ scheduleMessage(connection, TimeUnit.SECONDS.toMillis(5));
+ scheduleMessage(connection, TimeUnit.SECONDS.toMillis(45));
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ // Create the Browse Destination and the Reply To location
+ Destination requestBrowse =
session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
+ Destination browseDest = session.createTemporaryTopic();
+
+ // Create the "Browser"
+ MessageConsumer browser = session.createConsumer(browseDest,
ScheduledMessage.AMQ_SCHEDULED_DELAY + " = 45000" );
+
+ connection.start();
+
+ // Send the browse request
+ MessageProducer producer = session.createProducer(requestBrowse);
+ Message request = session.createMessage();
+ request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE);
+ request.setJMSReplyTo(browseDest);
+ producer.send(request);
+
+ // Now try and receive the one we selected
+ Message message = browser.receive(5000);
+ assertNotNull(message);
+ assertEquals(45000,
message.getLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY));
+
+ // Now check if there are anymore, there shouldn't be
+ message = browser.receive(5000);
+ assertNull(message);
+ }
+
+
+ protected void scheduleMessage(Connection connection, long delay) throws
Exception {
+ scheduleMessage(connection, delay, 1);
+ }
+
+ protected void scheduleMessage(Connection connection, long delay, int
count) throws Exception {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(destination);
+ TextMessage message = session.createTextMessage("test msg");
+ message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
+
+ for(int i = 0; i < count; ++i ) {
+ producer.send(message);
+ }
+
+ producer.close();
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ bindAddress = "vm://localhost";
+ super.setUp();
+ }
+
+ @Override
+ protected BrokerService createBroker() throws Exception {
+ return createBroker(true);
+ }
+
+ protected BrokerService createBroker(boolean delete) throws Exception {
+ File schedulerDirectory = new File("target/scheduler");
+ if (delete) {
+ IOHelper.mkdirs(schedulerDirectory);
+ IOHelper.deleteChildren(schedulerDirectory);
+ }
+ BrokerService answer = new BrokerService();
+ answer.setPersistent(isPersistent());
+ answer.setDeleteAllMessagesOnStartup(true);
+ answer.setDataDirectory("target");
+ answer.setSchedulerDirectoryFile(schedulerDirectory);
+ answer.setUseJmx(false);
+ answer.addConnector(bindAddress);
+ return answer;
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
------------------------------------------------------------------------------
svn:eol-style = native