http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerJmxManagementTests.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerJmxManagementTests.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerJmxManagementTests.java
new file mode 100644
index 0000000..b5d2227
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerJmxManagementTests.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.scheduler;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.openmbean.TabularData;
+
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests of the JMX JobSchedulerStore management MBean.
+ */
+public class JobSchedulerJmxManagementTests extends JobSchedulerTestSupport {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JobSchedulerJmxManagementTests.class);
+
+    @Test
+    public void testJobSchedulerMBeanIsRegistered() throws Exception {
+        JobSchedulerViewMBean view = getJobSchedulerMBean();
+        assertNotNull(view);
+        assertTrue(view.getAllJobs().isEmpty());
+    }
+
+    @Test
+    public void testGetNumberOfJobs() throws Exception {
+        JobSchedulerViewMBean view = getJobSchedulerMBean();
+        assertNotNull(view);
+        assertTrue(view.getAllJobs().isEmpty());
+        scheduleMessage(60000, -1, -1);
+        assertFalse(view.getAllJobs().isEmpty());
+        assertEquals(1, view.getAllJobs().size());
+        scheduleMessage(60000, -1, -1);
+        assertEquals(2, view.getAllJobs().size());
+    }
+
+    @Test
+    public void testRemvoeJob() throws Exception {
+        JobSchedulerViewMBean view = getJobSchedulerMBean();
+        assertNotNull(view);
+        assertTrue(view.getAllJobs().isEmpty());
+        scheduleMessage(60000, -1, -1);
+        assertFalse(view.getAllJobs().isEmpty());
+        TabularData jobs = view.getAllJobs();
+        assertEquals(1, jobs.size());
+        for (Object key : jobs.keySet()) {
+            String jobId = ((List<?>)key).get(0).toString();
+            LOG.info("Attempting to remove Job: {}", jobId);
+            view.removeJob(jobId);
+        }
+        assertTrue(view.getAllJobs().isEmpty());
+    }
+
+    @Test
+    public void testRemvoeJobInRange() throws Exception {
+        JobSchedulerViewMBean view = getJobSchedulerMBean();
+        assertNotNull(view);
+        assertTrue(view.getAllJobs().isEmpty());
+        scheduleMessage(60000, -1, -1);
+        assertFalse(view.getAllJobs().isEmpty());
+        String now = JobSupport.getDateTime(System.currentTimeMillis());
+        String later = JobSupport.getDateTime(System.currentTimeMillis() + 120 
* 1000);
+        view.removeAllJobs(now, later);
+        assertTrue(view.getAllJobs().isEmpty());
+    }
+
+    @Test
+    public void testGetNextScheduledJob() throws Exception {
+        JobSchedulerViewMBean view = getJobSchedulerMBean();
+        assertNotNull(view);
+        assertTrue(view.getAllJobs().isEmpty());
+        scheduleMessage(60000, -1, -1);
+        assertFalse(view.getAllJobs().isEmpty());
+        long before = System.currentTimeMillis() + 57 * 1000;
+        long toLate = System.currentTimeMillis() + 63 * 1000;
+        String next = view.getNextScheduleTime();
+        long nextTime = JobSupport.getDataTime(next);
+        LOG.info("Next Scheduled Time: {} should be after: {}", next, 
JobSupport.getDateTime(before));
+        assertTrue(nextTime > before);
+        assertTrue(nextTime < toLate);
+    }
+
+    @Test
+    public void testGetExecutionCount() throws Exception {
+        final JobSchedulerViewMBean view = getJobSchedulerMBean();
+        assertNotNull(view);
+        assertTrue(view.getAllJobs().isEmpty());
+        scheduleMessage(10000, 1000, 10);
+        assertFalse(view.getAllJobs().isEmpty());
+        TabularData jobs = view.getAllJobs();
+        assertEquals(1, jobs.size());
+        String jobId = null;
+        for (Object key : jobs.keySet()) {
+            jobId = ((List<?>)key).get(0).toString();
+        }
+
+        final String fixedJobId = jobId;
+        LOG.info("Attempting to get execution count for Job: {}", jobId);
+        assertEquals(0, view.getExecutionCount(jobId));
+
+        assertTrue("Should execute again", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return view.getExecutionCount(fixedJobId) > 0;
+            }
+        }));
+    }
+
+    @Override
+    protected boolean isUseJmx() {
+        return true;
+    }
+
+    protected void scheduleMessage(int time, int period, int repeat) throws 
Exception {
+        Connection connection = createConnection();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        TextMessage message = session.createTextMessage("test msg");
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
+        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
+        producer.send(message);
+        connection.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
new file mode 100644
index 0000000..c82f8ef
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
@@ -0,0 +1,409 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.util.IdGenerator;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
+
+    private static final transient Logger LOG = 
LoggerFactory.getLogger(JobSchedulerManagementTest.class);
+
+    @Test
+    public void testRemoveAllScheduled() throws Exception {
+        final int COUNT = 5;
+        Connection connection = createConnection();
+
+        // Setup the scheduled Message
+        scheduleMessage(connection, TimeUnit.SECONDS.toMillis(6), COUNT);
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        // Create the Browse Destination and the Reply To location
+        Destination management = 
session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
+
+        // Create the eventual Consumer to receive the scheduled message
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                latch.countDown();
+            }
+        });
+
+        connection.start();
+
+        // Send the remove request
+        MessageProducer producer = session.createProducer(management);
+        Message request = session.createMessage();
+        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, 
ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
+        producer.send(request);
+
+        // Now wait and see if any get delivered, none should.
+        latch.await(10, TimeUnit.SECONDS);
+        assertEquals(latch.getCount(), COUNT);
+    }
+
+    @Test
+    public void testRemoveAllScheduledAtTime() throws Exception {
+        final int COUNT = 3;
+        Connection connection = createConnection();
+
+        // Setup the scheduled Message
+        scheduleMessage(connection, TimeUnit.SECONDS.toMillis(6));
+        scheduleMessage(connection, TimeUnit.SECONDS.toMillis(15));
+        scheduleMessage(connection, TimeUnit.SECONDS.toMillis(20));
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        // Create the Browse Destination and the Reply To location
+        Destination management = 
session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
+        Destination browseDest = session.createTemporaryQueue();
+
+        // Create the eventual Consumer to receive the scheduled message
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                latch.countDown();
+            }
+        });
+
+        // Create the "Browser"
+        MessageConsumer browser = session.createConsumer(browseDest);
+        final CountDownLatch browsedLatch = new CountDownLatch(COUNT);
+        browser.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                browsedLatch.countDown();
+                LOG.debug("Scheduled Message Browser got Message: " + message);
+            }
+        });
+
+        connection.start();
+
+        long start = System.currentTimeMillis() + 
TimeUnit.SECONDS.toMillis(10);
+        long end = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30);
+
+        // Send the remove request
+        MessageProducer producer = session.createProducer(management);
+        Message request = session.createMessage();
+        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, 
ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
+        
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME, 
Long.toString(start));
+        
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME, 
Long.toString(end));
+        producer.send(request);
+
+        // Send the browse request
+        request = session.createMessage();
+        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, 
ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE);
+        request.setJMSReplyTo(browseDest);
+        producer.send(request);
+
+        // now see if we got back only the one remaining message.
+        latch.await(10, TimeUnit.SECONDS);
+        assertEquals(2, browsedLatch.getCount());
+
+        // Now wait and see if any get delivered, none should.
+        latch.await(10, TimeUnit.SECONDS);
+        assertEquals(2, latch.getCount());
+    }
+
+    @Test
+    public void testBrowseAllScheduled() throws Exception {
+        final int COUNT = 10;
+        Connection connection = createConnection();
+
+        // Setup the scheduled Message
+        scheduleMessage(connection, TimeUnit.SECONDS.toMillis(9), COUNT);
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        // Create the Browse Destination and the Reply To location
+        Destination requestBrowse = 
session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
+        Destination browseDest = session.createTemporaryQueue();
+
+        // Create the eventual Consumer to receive the scheduled message
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                latch.countDown();
+            }
+        });
+
+        // Create the "Browser"
+        MessageConsumer browser = session.createConsumer(browseDest);
+        final CountDownLatch browsedLatch = new CountDownLatch(COUNT);
+        browser.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                browsedLatch.countDown();
+                LOG.debug("Scheduled Message Browser got Message: " + message);
+            }
+        });
+
+        connection.start();
+
+        // Send the browse request
+        MessageProducer producer = session.createProducer(requestBrowse);
+        Message request = session.createMessage();
+        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, 
ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE);
+        request.setJMSReplyTo(browseDest);
+        producer.send(request);
+
+        // make sure the message isn't delivered early because we browsed it
+        Thread.sleep(2000);
+        assertEquals(latch.getCount(), COUNT);
+
+        // now see if we got all the scheduled messages on the browse
+        // destination.
+        latch.await(10, TimeUnit.SECONDS);
+        assertEquals(browsedLatch.getCount(), 0);
+
+        // now check that they all got delivered
+        latch.await(10, TimeUnit.SECONDS);
+        assertEquals(latch.getCount(), 0);
+    }
+
+    @Test
+    public void testBrowseWindowlScheduled() throws Exception {
+        final int COUNT = 10;
+        Connection connection = createConnection();
+
+        // Setup the scheduled Message
+        scheduleMessage(connection, TimeUnit.SECONDS.toMillis(5));
+        scheduleMessage(connection, TimeUnit.SECONDS.toMillis(10), COUNT);
+        scheduleMessage(connection, TimeUnit.SECONDS.toMillis(20));
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        // Create the Browse Destination and the Reply To location
+        Destination requestBrowse = 
session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
+        Destination browseDest = session.createTemporaryQueue();
+
+        // Create the eventual Consumer to receive the scheduled message
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        final CountDownLatch latch = new CountDownLatch(COUNT + 2);
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                latch.countDown();
+            }
+        });
+
+        // Create the "Browser"
+        MessageConsumer browser = session.createConsumer(browseDest);
+        final CountDownLatch browsedLatch = new CountDownLatch(COUNT);
+        browser.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                browsedLatch.countDown();
+                LOG.debug("Scheduled Message Browser got Message: " + message);
+            }
+        });
+
+        connection.start();
+
+        long start = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(6);
+        long end = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(15);
+
+        // Send the browse request
+        MessageProducer producer = session.createProducer(requestBrowse);
+        Message request = session.createMessage();
+        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, 
ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE);
+        
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME, 
Long.toString(start));
+        
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME, 
Long.toString(end));
+        request.setJMSReplyTo(browseDest);
+        producer.send(request);
+
+        // make sure the message isn't delivered early because we browsed it
+        Thread.sleep(2000);
+        assertEquals(COUNT + 2, latch.getCount());
+
+        // now see if we got all the scheduled messages on the browse
+        // destination.
+        latch.await(15, TimeUnit.SECONDS);
+        assertEquals(0, browsedLatch.getCount());
+
+        // now see if we got all the scheduled messages on the browse
+        // destination.
+        latch.await(20, TimeUnit.SECONDS);
+        assertEquals(0, latch.getCount());
+    }
+
+    @Test
+    public void testRemoveScheduled() throws Exception {
+        final int COUNT = 10;
+        Connection connection = createConnection();
+
+        // Setup the scheduled Message
+        scheduleMessage(connection, TimeUnit.SECONDS.toMillis(9), COUNT);
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        // Create the Browse Destination and the Reply To location
+        Destination management = 
session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
+        Destination browseDest = session.createTemporaryQueue();
+
+        // Create the eventual Consumer to receive the scheduled message
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(management);
+
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                latch.countDown();
+            }
+        });
+
+        // Create the "Browser"
+        Session browseSession = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer browser = browseSession.createConsumer(browseDest);
+
+        connection.start();
+
+        // Send the browse request
+        Message request = session.createMessage();
+        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, 
ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE);
+        request.setJMSReplyTo(browseDest);
+        producer.send(request);
+
+        // Browse all the Scheduled Messages.
+        for (int i = 0; i < COUNT; ++i) {
+            Message message = browser.receive(2000);
+            assertNotNull(message);
+
+            try {
+                Message remove = session.createMessage();
+                
remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, 
ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE);
+                remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID, 
message.getStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID));
+                producer.send(remove);
+            } catch (Exception e) {
+            }
+        }
+
+        // now check that they all got removed and are not delivered.
+        latch.await(11, TimeUnit.SECONDS);
+        assertEquals(COUNT, latch.getCount());
+    }
+
+    @Test
+    public void testRemoveNotScheduled() throws Exception {
+        Connection connection = createConnection();
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        // Create the Browse Destination and the Reply To location
+        Destination management = 
session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
+
+        MessageProducer producer = session.createProducer(management);
+
+        try {
+
+            // Send the remove request
+            Message remove = session.createMessage();
+            remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, 
ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
+            remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID, new 
IdGenerator().generateId());
+            producer.send(remove);
+        } catch (Exception e) {
+            fail("Caught unexpected exception during remove of unscheduled 
message.");
+        }
+    }
+
+    @Test
+    public void testBrowseWithSelector() throws Exception {
+        Connection connection = createConnection();
+
+        // Setup the scheduled Message
+        scheduleMessage(connection, TimeUnit.SECONDS.toMillis(9));
+        scheduleMessage(connection, TimeUnit.SECONDS.toMillis(10));
+        scheduleMessage(connection, TimeUnit.SECONDS.toMillis(5));
+        scheduleMessage(connection, TimeUnit.SECONDS.toMillis(45));
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        // Create the Browse Destination and the Reply To location
+        Destination requestBrowse = 
session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
+        Destination browseDest = session.createTemporaryTopic();
+
+        // Create the "Browser"
+        MessageConsumer browser = session.createConsumer(browseDest, 
ScheduledMessage.AMQ_SCHEDULED_DELAY + " = 45000");
+
+        connection.start();
+
+        // Send the browse request
+        MessageProducer producer = session.createProducer(requestBrowse);
+        Message request = session.createMessage();
+        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, 
ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE);
+        request.setJMSReplyTo(browseDest);
+        producer.send(request);
+
+        // Now try and receive the one we selected
+        Message message = browser.receive(5000);
+        assertNotNull(message);
+        assertEquals(45000, 
message.getLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY));
+
+        // Now check if there are anymore, there shouldn't be
+        message = browser.receive(5000);
+        assertNull(message);
+    }
+
+    protected void scheduleMessage(Connection connection, long delay) throws 
Exception {
+        scheduleMessage(connection, delay, 1);
+    }
+
+    protected void scheduleMessage(Connection connection, long delay, int 
count) throws Exception {
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        TextMessage message = session.createTextMessage("test msg");
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
+
+        for (int i = 0; i < count; ++i) {
+            producer.send(message);
+        }
+
+        producer.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java
new file mode 100644
index 0000000..c013a4c
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.scheduler;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JobSchedulerStoreCheckpointTest {
+
+    static final Logger LOG = 
LoggerFactory.getLogger(JobSchedulerStoreCheckpointTest.class);
+
+    private JobSchedulerStoreImpl store;
+    private JobScheduler scheduler;
+    private ByteSequence payload;
+
+    @Before
+    public void setUp() throws Exception {
+        File directory = new File("target/test/ScheduledJobsDB");
+        IOHelper.mkdirs(directory);
+        IOHelper.deleteChildren(directory);
+        startStore(directory);
+
+        byte[] data = new byte[8192];
+        for (int i = 0; i < data.length; ++i) {
+            data[i] = (byte) (i % 256);
+        }
+
+        payload = new ByteSequence(data);
+    }
+
+    protected void startStore(File directory) throws Exception {
+        store = new JobSchedulerStoreImpl();
+        store.setDirectory(directory);
+        store.setCheckpointInterval(5000);
+        store.setCleanupInterval(10000);
+        store.setJournalMaxFileLength(10 * 1024);
+        store.start();
+        scheduler = store.getJobScheduler("test");
+        scheduler.startDispatching();
+    }
+
+    private int getNumJournalFiles() throws IOException {
+        return store.getJournal().getFileMap().size();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        scheduler.stopDispatching();
+        store.stop();
+    }
+
+    @Test
+    public void test() throws Exception {
+        final int COUNT = 10;
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        scheduler.addListener(new JobListener() {
+            @Override
+            public void scheduledJob(String id, ByteSequence job) {
+                latch.countDown();
+            }
+        });
+
+        long time = TimeUnit.SECONDS.toMillis(30);
+        for (int i = 0; i < COUNT; i++) {
+            scheduler.schedule("id" + i, payload, "", time, 0, 0);
+        }
+
+        int size = scheduler.getAllJobs().size();
+        assertEquals(size, COUNT);
+
+        LOG.info("Number of journal log files: {}", getNumJournalFiles());
+        // need a little slack so go over 60 seconds
+        assertTrue(latch.await(70, TimeUnit.SECONDS));
+        assertEquals(0, latch.getCount());
+
+        for (int i = 0; i < COUNT; i++) {
+            scheduler.schedule("id" + i, payload, "", time, 0, 0);
+        }
+
+        LOG.info("Number of journal log files: {}", getNumJournalFiles());
+        // need a little slack so go over 60 seconds
+        assertTrue(latch.await(70, TimeUnit.SECONDS));
+        assertEquals(0, latch.getCount());
+
+        assertTrue("Should be only one log left: " + getNumJournalFiles(), 
Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getNumJournalFiles() == 1;
+            }
+        }, TimeUnit.MINUTES.toMillis(2)));
+
+        LOG.info("Number of journal log files: {}", getNumJournalFiles());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java
new file mode 100644
index 0000000..df1e7ff
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOHelper;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JobSchedulerStoreTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JobSchedulerStoreTest.class);
+
+    @Test(timeout = 120 * 1000)
+    public void testRestart() throws Exception {
+        JobSchedulerStore store = new JobSchedulerStoreImpl();
+        File directory = new File("target/test/ScheduledDB");
+        IOHelper.mkdirs(directory);
+        IOHelper.deleteChildren(directory);
+        store.setDirectory(directory);
+        final int NUMBER = 1000;
+        store.start();
+        List<ByteSequence> list = new ArrayList<ByteSequence>();
+        for (int i = 0; i < NUMBER; i++) {
+            ByteSequence buff = new ByteSequence(new String("testjob" + 
i).getBytes());
+            list.add(buff);
+        }
+
+        JobScheduler js = store.getJobScheduler("test");
+        js.startDispatching();
+        int count = 0;
+        long startTime = 10 * 60 * 1000;
+        long period = startTime;
+        for (ByteSequence job : list) {
+            js.schedule("id:" + (count++), job, "", startTime, period, -1);
+        }
+
+        List<Job> test = js.getAllJobs();
+        LOG.debug("Found {} jobs in the store before restart", test.size());
+        assertEquals(list.size(), test.size());
+        store.stop();
+
+        store.start();
+        js = store.getJobScheduler("test");
+        test = js.getAllJobs();
+        LOG.debug("Found {} jobs in the store after restart", test.size());
+        assertEquals(list.size(), test.size());
+
+        for (int i = 0; i < list.size(); i++) {
+            String orig = new String(list.get(i).getData());
+            String payload = new String(test.get(i).getPayload());
+            assertEquals(orig, payload);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java
new file mode 100644
index 0000000..b84a782
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.Calendar;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JobSchedulerTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JobSchedulerTest.class);
+
+    private JobSchedulerStore store;
+    private JobScheduler scheduler;
+
+    @Test
+    public void testAddLongStringByteSequence() throws Exception {
+        final int COUNT = 10;
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        scheduler.addListener(new JobListener() {
+            @Override
+            public void scheduledJob(String id, ByteSequence job) {
+                latch.countDown();
+            }
+
+        });
+        for (int i = 0; i < COUNT; i++) {
+            String test = new String("test" + i);
+            scheduler.schedule("id" + i, new ByteSequence(test.getBytes()), 
1000);
+        }
+        latch.await(5, TimeUnit.SECONDS);
+        assertEquals(0, latch.getCount());
+    }
+
+    @Test
+    public void testAddCronAndByteSequence() throws Exception {
+
+        final CountDownLatch latch = new CountDownLatch(1);
+        scheduler.addListener(new JobListener() {
+            @Override
+            public void scheduledJob(String id, ByteSequence job) {
+                latch.countDown();
+            }
+        });
+
+        Calendar current = Calendar.getInstance();
+        current.add(Calendar.MINUTE, 1);
+        int minutes = current.get(Calendar.MINUTE);
+        int hour = current.get(Calendar.HOUR_OF_DAY);
+        int day = current.get(Calendar.DAY_OF_WEEK) - 1;
+
+        String cronTab = String.format("%d %d * * %d", minutes, hour, day);
+
+        String str = new String("test1");
+        scheduler.schedule("id:1", new ByteSequence(str.getBytes()), cronTab, 
0, 0, 0);
+
+        // need a little slack so go over 60 seconds
+        assertTrue(latch.await(70, TimeUnit.SECONDS));
+        assertEquals(0, latch.getCount());
+    }
+
+    @Test
+    public void testAddLongLongIntStringByteSequence() throws Exception {
+        final int COUNT = 10;
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        scheduler.addListener(new JobListener() {
+            @Override
+            public void scheduledJob(String id, ByteSequence job) {
+                latch.countDown();
+            }
+        });
+        long time = 2000;
+        for (int i = 0; i < COUNT; i++) {
+            String test = new String("test" + i);
+            scheduler.schedule("id" + i, new ByteSequence(test.getBytes()), 
"", time, 10, -1);
+        }
+        assertTrue(latch.getCount() == COUNT);
+        latch.await(3000, TimeUnit.SECONDS);
+        assertTrue(latch.getCount() == 0);
+    }
+
+    @Test
+    public void testAddStopThenDeliver() throws Exception {
+        final int COUNT = 10;
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        long time = 2000;
+        for (int i = 0; i < COUNT; i++) {
+            String test = new String("test" + i);
+            scheduler.schedule("id" + i, new ByteSequence(test.getBytes()), 
"", time, 1000, -1);
+        }
+        File directory = store.getDirectory();
+        tearDown();
+        startStore(directory);
+        scheduler.addListener(new JobListener() {
+            @Override
+            public void scheduledJob(String id, ByteSequence job) {
+                latch.countDown();
+            }
+        });
+        assertTrue(latch.getCount() == COUNT);
+        latch.await(3000, TimeUnit.SECONDS);
+        assertTrue(latch.getCount() == 0);
+    }
+
+    @Test
+    public void testRemoveLong() throws Exception {
+        final int COUNT = 10;
+
+        long time = 60000;
+        for (int i = 0; i < COUNT; i++) {
+            String str = new String("test" + i);
+            scheduler.schedule("id" + i, new ByteSequence(str.getBytes()), "", 
time, 1000, -1);
+        }
+
+        int size = scheduler.getAllJobs().size();
+        assertEquals(size, COUNT);
+
+        long removeTime = scheduler.getNextScheduleTime();
+        scheduler.remove(removeTime);
+
+        // If all jobs are not started within the same second we need to call 
remove again
+        if (size != 0) {
+            removeTime = scheduler.getNextScheduleTime();
+            scheduler.remove(removeTime);
+        }
+
+        size = scheduler.getAllJobs().size();
+        assertEquals(0, size);
+    }
+
+    @Test
+    public void testRemoveString() throws Exception {
+        final int COUNT = 10;
+        final String test = "TESTREMOVE";
+        long time = 20000;
+
+        for (int i = 0; i < COUNT; i++) {
+            String str = new String("test" + i);
+            scheduler.schedule("id" + i, new ByteSequence(str.getBytes()), "", 
time, 1000, -1);
+            if (i == COUNT / 2) {
+                scheduler.schedule(test, new ByteSequence(test.getBytes()), 
"", time, 1000, -1);
+            }
+        }
+
+        int size = scheduler.getAllJobs().size();
+        assertEquals(size, COUNT + 1);
+        scheduler.remove(test);
+        size = scheduler.getAllJobs().size();
+        assertEquals(size, COUNT);
+    }
+
+    @Test
+    public void testGetExecutionCount() throws Exception {
+        final String jobId = "Job-1";
+        long time = 10000;
+        final CountDownLatch done = new CountDownLatch(10);
+
+        String str = new String("test");
+        scheduler.schedule(jobId, new ByteSequence(str.getBytes()), "", time, 
1000, 10);
+
+        int size = scheduler.getAllJobs().size();
+        assertEquals(size, 1);
+
+        scheduler.addListener(new JobListener() {
+            @Override
+            public void scheduledJob(String id, ByteSequence job) {
+                LOG.info("Job exectued: {}", 11 - done.getCount());
+                done.countDown();
+            }
+        });
+
+        List<Job> jobs = scheduler.getNextScheduleJobs();
+        assertEquals(1, jobs.size());
+        Job job = jobs.get(0);
+        assertEquals(jobId, job.getJobId());
+        assertEquals(0, job.getExecutionCount());
+        assertTrue("Should have fired ten times.", done.await(60, 
TimeUnit.SECONDS));
+        // The job is not updated on the last firing as it is removed from the 
store following
+        // it's last execution so the count will always be one less than the 
max firings.
+        assertTrue(job.getExecutionCount() >= 9);
+    }
+
+    @Test
+    public void testgetAllJobs() throws Exception {
+        final int COUNT = 10;
+        final String ID = "id:";
+        long time = 20000;
+
+        for (int i = 0; i < COUNT; i++) {
+            String str = new String("test" + i);
+            scheduler.schedule(ID + i, new ByteSequence(str.getBytes()), "", 
time, 10 + i, -1);
+        }
+
+        List<Job> list = scheduler.getAllJobs();
+
+        assertEquals(list.size(), COUNT);
+        int count = 0;
+        for (Job job : list) {
+            assertEquals(job.getJobId(), ID + count);
+            count++;
+        }
+    }
+
+    @Test
+    public void testgetAllJobsInRange() throws Exception {
+        final int COUNT = 10;
+        final String ID = "id:";
+        long start = 10000;
+
+        for (int i = 0; i < COUNT; i++) {
+            String str = new String("test" + i);
+            scheduler.schedule(ID + i, new ByteSequence(str.getBytes()), "", 
start + (i * 1000), 10000 + i, 0);
+        }
+
+        start = System.currentTimeMillis();
+        long finish = start + 12000 + (COUNT * 1000);
+        List<Job> list = scheduler.getAllJobs(start, finish);
+
+        assertEquals(COUNT, list.size());
+        int count = 0;
+        for (Job job : list) {
+            assertEquals(job.getJobId(), ID + count);
+            count++;
+        }
+    }
+
+    @Test
+    public void testRemoveAllJobsInRange() throws Exception {
+        final int COUNT = 10;
+        final String ID = "id:";
+        long start = 10000;
+
+        for (int i = 0; i < COUNT; i++) {
+            String str = new String("test" + i);
+            scheduler.schedule(ID + i, new ByteSequence(str.getBytes()), "", 
start + (i * 1000), 10000 + i, 0);
+        }
+        start = System.currentTimeMillis();
+        long finish = start + 12000 + (COUNT * 1000);
+        scheduler.removeAllJobs(start, finish);
+
+        assertTrue(scheduler.getAllJobs().isEmpty());
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        File directory = new File("target/test/ScheduledJobsDB");
+        IOHelper.mkdirs(directory);
+        IOHelper.deleteChildren(directory);
+        startStore(directory);
+    }
+
+    protected JobSchedulerStore createJobSchedulerStore() throws Exception {
+        return new JobSchedulerStoreImpl();
+    }
+
+    protected void startStore(File directory) throws Exception {
+        store = createJobSchedulerStore();
+        store.setDirectory(directory);
+        store.start();
+        scheduler = store.getJobScheduler("test");
+        scheduler.startDispatching();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        scheduler.stopDispatching();
+        store.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java
new file mode 100644
index 0000000..5bf8d8c
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.scheduler;
+
+import java.io.File;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Queue;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.IOHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+/**
+ * Base class for tests of the Broker's JobSchedulerStore.
+ */
+public class JobSchedulerTestSupport {
+
+    @Rule public TestName name = new TestName();
+
+    protected String connectionUri;
+    protected BrokerService broker;
+    protected JobScheduler jobScheduler;
+    protected Queue destination;
+
+    @Before
+    public void setUp() throws Exception {
+        connectionUri = "vm://localhost";
+        destination = new ActiveMQQueue(name.getMethodName());
+
+        broker = createBroker();
+        broker.start();
+        broker.waitUntilStarted();
+
+        jobScheduler = broker.getJobSchedulerStore().getJobScheduler("JMS");
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+    protected Connection createConnection() throws Exception {
+        return createConnectionFactory().createConnection();
+    }
+
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory(connectionUri);
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        return createBroker(true);
+    }
+
+    protected boolean isUseJmx() {
+        return false;
+    }
+
+    protected boolean isPersistent() {
+        return true;
+    }
+
+    protected JobSchedulerViewMBean getJobSchedulerMBean() throws Exception {
+        ObjectName objectName = broker.getAdminView().getJMSJobScheduler();
+        JobSchedulerViewMBean scheduler = null;
+        if (objectName != null) {
+            scheduler = (JobSchedulerViewMBean) broker.getManagementContext()
+                .newProxyInstance(objectName, JobSchedulerViewMBean.class, 
true);
+        }
+
+        return scheduler;
+    }
+
+    protected BrokerService createBroker(boolean delete) throws Exception {
+        File schedulerDirectory = new File("target/scheduler");
+        if (delete) {
+            IOHelper.mkdirs(schedulerDirectory);
+            IOHelper.deleteChildren(schedulerDirectory);
+        }
+
+        BrokerService answer = new BrokerService();
+        answer.setPersistent(isPersistent());
+        answer.setDeleteAllMessagesOnStartup(true);
+        answer.setDataDirectory("target");
+        answer.setSchedulerDirectoryFile(schedulerDirectory);
+        answer.setSchedulerSupport(true);
+        answer.setUseJmx(isUseJmx());
+        return answer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTxTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTxTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTxTest.java
new file mode 100644
index 0000000..996cc55
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTxTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.scheduler;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ScheduledMessage;
+import org.junit.Test;
+
+public class JobSchedulerTxTest extends JobSchedulerTestSupport {
+
+    @Test
+    public void testTxSendWithRollback() throws Exception {
+        final int COUNT = 10;
+        Connection connection = createConnection();
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(destination);
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                latch.countDown();
+            }
+        });
+
+        connection.start();
+        long time = 5000;
+        Session producerSession = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+        MessageProducer producer = producerSession.createProducer(destination);
+
+        for (int i = 0; i < COUNT; ++i) {
+            TextMessage message = session.createTextMessage("test msg");
+            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 
time);
+            producer.send(message);
+        }
+        producer.close();
+        producerSession.rollback();
+
+        // make sure the message isn't delivered early
+        Thread.sleep(2000);
+        assertEquals(COUNT, latch.getCount());
+        latch.await(5, TimeUnit.SECONDS);
+        assertEquals(COUNT, latch.getCount());
+    }
+
+    @Test
+    public void testTxSendWithCommit() throws Exception {
+        final int COUNT = 10;
+        Connection connection = createConnection();
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(destination);
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                latch.countDown();
+            }
+        });
+
+        connection.start();
+        long time = 5000;
+        Session producerSession = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+        MessageProducer producer = producerSession.createProducer(destination);
+
+        for (int i = 0; i < COUNT; ++i) {
+            TextMessage message = session.createTextMessage("test msg");
+            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 
time);
+            producer.send(message);
+        }
+        producer.close();
+        producerSession.commit();
+
+        // make sure the message isn't delivered early
+        Thread.sleep(2000);
+        assertEquals(COUNT, latch.getCount());
+        latch.await(5, TimeUnit.SECONDS);
+        assertEquals(0, latch.getCount());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerIndexRebuildTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerIndexRebuildTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerIndexRebuildTest.java
new file mode 100644
index 0000000..03e5c84
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerIndexRebuildTest.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.ProtectionDomain;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KahaDBSchedulerIndexRebuildTest {
+
+    static final Logger LOG = 
LoggerFactory.getLogger(KahaDBSchedulerIndexRebuildTest.class);
+
+    private BrokerService broker = null;
+    private final int NUM_JOBS = 50;
+
+    static String basedir;
+    static {
+        try {
+            ProtectionDomain protectionDomain = 
SchedulerDBVersionTest.class.getProtectionDomain();
+            basedir = new File(new 
File(protectionDomain.getCodeSource().getLocation().getPath()), 
"../.").getCanonicalPath();
+        } catch (IOException e) {
+            basedir = ".";
+        }
+    }
+
+    private final File schedulerStoreDir = new File(basedir, 
"activemq-data/store/scheduler");
+    private final File storeDir = new File(basedir, "activemq-data/store/");
+
+    @Before
+    public void setUp() throws Exception {
+        LOG.info("Test Dir = {}", schedulerStoreDir);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    @Test
+    public void testIndexRebuilds() throws Exception {
+        IOHelper.deleteFile(schedulerStoreDir);
+
+        JobSchedulerStoreImpl schedulerStore = createScheduler();
+        broker = createBroker(schedulerStore);
+        broker.start();
+        ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost");
+        Connection connection = cf.createConnection();
+        connection.start();
+        for (int i = 0; i < NUM_JOBS; ++i) {
+            scheduleRepeating(connection);
+        }
+        connection.close();
+
+        JobScheduler scheduler = schedulerStore.getJobScheduler("JMS");
+        assertNotNull(scheduler);
+        assertEquals(NUM_JOBS, scheduler.getAllJobs().size());
+
+        broker.stop();
+
+        IOHelper.delete(new File(schedulerStoreDir, "scheduleDB.data"));
+
+        schedulerStore = createScheduler();
+        broker = createBroker(schedulerStore);
+        broker.start();
+
+        scheduler = schedulerStore.getJobScheduler("JMS");
+        assertNotNull(scheduler);
+        assertEquals(NUM_JOBS, scheduler.getAllJobs().size());
+    }
+
+    @Test
+    public void testIndexRebuildsAfterSomeJobsExpire() throws Exception {
+        IOHelper.deleteFile(schedulerStoreDir);
+
+        JobSchedulerStoreImpl schedulerStore = createScheduler();
+        broker = createBroker(schedulerStore);
+        broker.start();
+        ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost");
+        Connection connection = cf.createConnection();
+        connection.start();
+        for (int i = 0; i < NUM_JOBS; ++i) {
+            scheduleRepeating(connection);
+            scheduleOneShot(connection);
+        }
+        connection.close();
+
+        JobScheduler scheduler = schedulerStore.getJobScheduler("JMS");
+        assertNotNull(scheduler);
+        assertEquals(NUM_JOBS * 2, scheduler.getAllJobs().size());
+
+        final JobScheduler awaitingOneShotTimeout = scheduler;
+        assertTrue("One shot jobs should time out", Wait.waitFor(new 
Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return awaitingOneShotTimeout.getAllJobs().size() == NUM_JOBS;
+            }
+        }, TimeUnit.MINUTES.toMillis(2)));
+
+        broker.stop();
+
+        IOHelper.delete(new File(schedulerStoreDir, "scheduleDB.data"));
+
+        schedulerStore = createScheduler();
+        broker = createBroker(schedulerStore);
+        broker.start();
+
+        scheduler = schedulerStore.getJobScheduler("JMS");
+        assertNotNull(scheduler);
+        assertEquals(NUM_JOBS, scheduler.getAllJobs().size());
+    }
+
+    private void scheduleRepeating(Connection connection) throws Exception {
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("test.queue");
+        MessageProducer producer = session.createProducer(queue);
+
+        TextMessage message = session.createTextMessage("test msg");
+        long time = 360 * 1000;
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 500);
+        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, -1);
+        producer.send(message);
+        producer.close();
+    }
+
+    private void scheduleOneShot(Connection connection) throws Exception {
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("test.queue");
+        MessageProducer producer = session.createProducer(queue);
+
+        TextMessage message = session.createTextMessage("test msg");
+        long time = TimeUnit.SECONDS.toMillis(30);
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
+        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 0);
+        producer.send(message);
+        producer.close();
+    }
+
+    protected JobSchedulerStoreImpl createScheduler() {
+        JobSchedulerStoreImpl scheduler = new JobSchedulerStoreImpl();
+        scheduler.setDirectory(schedulerStoreDir);
+        scheduler.setJournalMaxFileLength(10 * 1024);
+        return scheduler;
+    }
+
+    protected BrokerService createBroker(JobSchedulerStoreImpl scheduler) 
throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setJobSchedulerStore(scheduler);
+        answer.setPersistent(true);
+        answer.setDataDirectory(storeDir.getAbsolutePath());
+        answer.setSchedulerSupport(true);
+        answer.setUseJmx(false);
+        return answer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerMissingJournalLogsTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerMissingJournalLogsTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerMissingJournalLogsTest.java
new file mode 100644
index 0000000..30da10d
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerMissingJournalLogsTest.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.scheduler;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.ProtectionDomain;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.disk.journal.DataFile;
+import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
+import org.apache.activemq.util.IOHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *Test that the store recovers even if some log files are missing.
+ */
+public class KahaDBSchedulerMissingJournalLogsTest {
+
+    static final Logger LOG = 
LoggerFactory.getLogger(KahaDBSchedulerIndexRebuildTest.class);
+
+    private BrokerService broker = null;
+    private JobSchedulerStoreImpl schedulerStore = null;
+
+    private final int NUM_LOGS = 6;
+
+    static String basedir;
+    static {
+        try {
+            ProtectionDomain protectionDomain = 
SchedulerDBVersionTest.class.getProtectionDomain();
+            basedir = new File(new 
File(protectionDomain.getCodeSource().getLocation().getPath()), 
"../.").getCanonicalPath();
+        } catch (IOException e) {
+            basedir = ".";
+        }
+    }
+
+    private final File schedulerStoreDir = new File(basedir, 
"activemq-data/store/scheduler");
+    private final File storeDir = new File(basedir, "activemq-data/store/");
+
+    /**
+     * @throws java.lang.Exception
+     */
+    @Before
+    public void setUp() throws Exception {
+        IOHelper.deleteFile(schedulerStoreDir);
+        LOG.info("Test Dir = {}", schedulerStoreDir);
+
+        createBroker();
+        broker.start();
+        broker.waitUntilStarted();
+
+        schedulerStore = (JobSchedulerStoreImpl) broker.getJobSchedulerStore();
+    }
+
+    /**
+     * @throws java.lang.Exception
+     */
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+    @Test(timeout=120 * 1000)
+    public void testMissingLogsCausesBrokerToFail() throws Exception {
+        fillUpSomeLogFiles();
+
+        int jobCount = 
schedulerStore.getJobScheduler("JMS").getAllJobs().size();
+        LOG.info("There are {} jobs in the store.", jobCount);
+
+        List<File> toDelete = new ArrayList<File>();
+        Map<Integer, DataFile> files = 
schedulerStore.getJournal().getFileMap();
+        for (int i = files.size(); i > files.size() / 2; i--) {
+            toDelete.add(files.get(i).getFile());
+        }
+
+        broker.stop();
+        broker.waitUntilStopped();
+
+        for (File file : toDelete) {
+            LOG.info("File to delete: {}", file);
+            IOHelper.delete(file);
+        }
+
+        try {
+            createBroker();
+            fail("Should not start when logs are missing.");
+        } catch (Exception e) {
+        }
+    }
+
+    @Test(timeout=120 * 1000)
+    public void testRecoverWhenSomeLogsAreMissing() throws Exception {
+        fillUpSomeLogFiles();
+
+        int jobCount = 
schedulerStore.getJobScheduler("JMS").getAllJobs().size();
+        LOG.info("There are {} jobs in the store.", jobCount);
+
+        List<File> toDelete = new ArrayList<File>();
+        Map<Integer, DataFile> files = 
schedulerStore.getJournal().getFileMap();
+        for (int i = files.size() - 1; i > files.size() / 2; i--) {
+            toDelete.add(files.get(i).getFile());
+        }
+
+        broker.stop();
+        broker.waitUntilStopped();
+
+        for (File file : toDelete) {
+            LOG.info("File to delete: {}", file);
+            IOHelper.delete(file);
+        }
+
+        schedulerStore = createScheduler();
+        schedulerStore.setIgnoreMissingJournalfiles(true);
+
+        createBroker(schedulerStore);
+        broker.start();
+        broker.waitUntilStarted();
+
+        int postRecoverJobCount = 
schedulerStore.getJobScheduler("JMS").getAllJobs().size();
+        assertTrue(postRecoverJobCount > 0);
+        assertTrue(postRecoverJobCount < jobCount);
+    }
+
+    private void fillUpSomeLogFiles() throws Exception {
+        ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost");
+        Connection connection = cf.createConnection();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("test.queue");
+        MessageProducer producer = session.createProducer(queue);
+        connection.start();
+        while (true) {
+            scheduleRepeating(session, producer);
+            if (schedulerStore.getJournal().getFileMap().size() == NUM_LOGS) {
+                break;
+            }
+        }
+        connection.close();
+    }
+
+    private void scheduleRepeating(Session session, MessageProducer producer) 
throws Exception {
+        TextMessage message = session.createTextMessage("test msg");
+        long time = 360 * 1000;
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 500);
+        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, -1);
+        producer.send(message);
+    }
+
+    protected JobSchedulerStoreImpl createScheduler() {
+        JobSchedulerStoreImpl scheduler = new JobSchedulerStoreImpl();
+        scheduler.setDirectory(schedulerStoreDir);
+        scheduler.setJournalMaxFileLength(10 * 1024);
+        return scheduler;
+    }
+
+    protected void createBroker() throws Exception {
+        createBroker(createScheduler());
+    }
+
+    protected void createBroker(JobSchedulerStoreImpl scheduler) throws 
Exception {
+        broker = new BrokerService();
+        broker.setJobSchedulerStore(scheduler);
+        broker.setPersistent(true);
+        broker.setDataDirectory(storeDir.getAbsolutePath());
+        broker.setSchedulerSupport(true);
+        broker.setUseJmx(false);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/LostScheduledMessagesTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/LostScheduledMessagesTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/LostScheduledMessagesTest.java
new file mode 100644
index 0000000..53588b5
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/LostScheduledMessagesTest.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.scheduler;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.IOHelper;
+import org.apache.log4j.BasicConfigurator;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LostScheduledMessagesTest {
+
+    private BrokerService broker;
+
+    private static final File schedulerDirectory = new 
File("target/test/ScheduledDB");
+    private static final File messageDirectory = new 
File("target/test/MessageDB");
+    private static final String QUEUE_NAME = "test";
+
+    @Before
+    public void setup() throws Exception {
+        IOHelper.mkdirs(schedulerDirectory);
+        IOHelper.deleteChildren(schedulerDirectory);
+
+        IOHelper.mkdirs(messageDirectory);
+        IOHelper.deleteChildren(messageDirectory);
+    }
+
+    private void startBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setSchedulerSupport(true);
+        broker.setPersistent(true);
+        broker.setDeleteAllMessagesOnStartup(false);
+        broker.setDataDirectory("target");
+        broker.setSchedulerDirectoryFile(schedulerDirectory);
+        broker.setDataDirectoryFile(messageDirectory);
+        broker.setUseJmx(false);
+        broker.addConnector("vm://localhost");
+        broker.start();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        broker.stop();
+        BasicConfigurator.resetConfiguration();
+    }
+
+    @Test
+    public void MessagePassedNotUsingScheduling() throws Exception {
+        doTest(false);
+    }
+
+    @Test
+    public void MessageLostWhenUsingScheduling() throws Exception {
+        doTest(true);
+    }
+
+    private void doTest(boolean useScheduling) throws Exception {
+
+        int DELIVERY_DELAY_MS = 5000;
+
+        startBroker();
+
+        long startTime = System.currentTimeMillis();
+
+        // Send a message scheduled for delivery in 5 seconds
+        ConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
+        Connection connection = cf.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = 
session.createProducer(session.createQueue(QUEUE_NAME));
+        Message message = session.createTextMessage("test");
+        if (useScheduling) {
+            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 
DELIVERY_DELAY_MS);
+        }
+        producer.send(message);
+
+        session.close();
+        connection.close();
+
+        broker.getServices();
+
+        // shut down broker
+        broker.stop();
+        broker.waitUntilStopped();
+
+        // Make sure that broker have stopped within delivery delay
+        long shutdownTime = System.currentTimeMillis();
+        assertTrue("Failed to shut down broker in expected time. Test results 
inconclusive", shutdownTime - startTime < DELIVERY_DELAY_MS);
+
+        // make sure that delivery falls into down time window
+        TimeUnit.MILLISECONDS.sleep(DELIVERY_DELAY_MS);
+
+        // Start new broker instance
+        startBroker();
+
+        final AtomicLong receiveCounter = new AtomicLong();
+
+        cf = new ActiveMQConnectionFactory("vm://localhost");
+        connection = cf.createConnection();
+        connection.start();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageConsumer consumer = 
session.createConsumer(session.createQueue(QUEUE_NAME));
+        consumer.setMessageListener(new MessageListener() {
+
+            @Override
+            public void onMessage(Message message) {
+                receiveCounter.incrementAndGet();
+            }
+        });
+
+        // Wait for a while to let MQ process the message
+        TimeUnit.MILLISECONDS.sleep(DELIVERY_DELAY_MS * 2);
+
+        session.close();
+        connection.close();
+
+        assertEquals(1, receiveCounter.get());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/SchedulerDBVersionTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/SchedulerDBVersionTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/SchedulerDBVersionTest.java
new file mode 100644
index 0000000..721f417
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/SchedulerDBVersionTest.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.ProtectionDomain;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
+import org.apache.activemq.util.IOHelper;
+import org.junit.After;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SchedulerDBVersionTest {
+    static String basedir;
+    static {
+        try {
+            ProtectionDomain protectionDomain = 
SchedulerDBVersionTest.class.getProtectionDomain();
+            basedir = new File(new 
File(protectionDomain.getCodeSource().getLocation().getPath()), 
"../..").getCanonicalPath();
+        } catch (IOException e) {
+            basedir = ".";
+        }
+    }
+
+    static final Logger LOG = 
LoggerFactory.getLogger(SchedulerDBVersionTest.class);
+    final static File VERSION_LEGACY_JMS =
+        new File(basedir + 
"/src/test/resources/org/apache/activemq/store/schedulerDB/legacy");
+
+    BrokerService broker = null;
+
+    protected BrokerService createBroker(JobSchedulerStoreImpl scheduler) 
throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setJobSchedulerStore(scheduler);
+        answer.setPersistent(true);
+        answer.setDataDirectory("target");
+        answer.setSchedulerSupport(true);
+        answer.setUseJmx(false);
+        return answer;
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    @Ignore("Used only when a new version of the store needs to archive it's 
test data.")
+    @Test
+    public void testCreateStore() throws Exception {
+        JobSchedulerStoreImpl scheduler = new JobSchedulerStoreImpl();
+        File dir = new 
File("src/test/resources/org/apache/activemq/store/schedulerDB/legacy");
+        IOHelper.deleteFile(dir);
+        scheduler.setDirectory(dir);
+        scheduler.setJournalMaxFileLength(1024 * 1024);
+        broker = createBroker(scheduler);
+        broker.start();
+        ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost");
+        Connection connection = cf.createConnection();
+        connection.start();
+        scheduleRepeating(connection);
+        connection.close();
+        broker.stop();
+    }
+
+    private void scheduleRepeating(Connection connection) throws Exception {
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("test.queue");
+        MessageProducer producer = session.createProducer(queue);
+
+        TextMessage message = session.createTextMessage("test msg");
+        long time = 1000;
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 500);
+        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, -1);
+        producer.send(message);
+        producer.close();
+    }
+
+    @Test
+    public void testLegacyStoreConversion() throws Exception {
+        doTestScheduleRepeated(VERSION_LEGACY_JMS);
+    }
+
+    public void doTestScheduleRepeated(File existingStore) throws Exception {
+        File testDir = new 
File("target/activemq-data/store/scheduler/versionDB");
+        IOHelper.deleteFile(testDir);
+        IOHelper.copyFile(existingStore, testDir);
+
+        final int NUMBER = 10;
+        ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost");
+
+        for (int i = 0; i < 3; ++i) {
+            JobSchedulerStoreImpl scheduler = new JobSchedulerStoreImpl();
+            scheduler.setDirectory(testDir);
+            scheduler.setJournalMaxFileLength(1024 * 1024);
+            BrokerService broker = createBroker(scheduler);
+            broker.start();
+            broker.waitUntilStarted();
+
+            final AtomicInteger count = new AtomicInteger();
+            Connection connection = cf.createConnection();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("test.queue");
+
+            MessageConsumer consumer = session.createConsumer(queue);
+
+            final CountDownLatch latch = new CountDownLatch(NUMBER);
+            consumer.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    LOG.info("Received scheduled message: {}", message);
+                    latch.countDown();
+                    count.incrementAndGet();
+                }
+            });
+
+            connection.start();
+            assertEquals(latch.getCount(), NUMBER);
+            latch.await(30, TimeUnit.SECONDS);
+
+            connection.close();
+            broker.stop();
+            broker.waitUntilStopped();
+
+            assertEquals(0, latch.getCount());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemeoryJmsSchedulerTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemeoryJmsSchedulerTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemeoryJmsSchedulerTest.java
new file mode 100644
index 0000000..5144203
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemeoryJmsSchedulerTest.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler.memory;
+
+import org.apache.activemq.broker.scheduler.JmsSchedulerTest;
+
+/**
+ * Test for the In-Memory Scheduler variant.
+ */
+public class InMemeoryJmsSchedulerTest extends JmsSchedulerTest {
+
+    @Override
+    protected boolean isPersistent() {
+        return false;
+    }
+
+    @Override
+    public void testScheduleRestart() throws Exception {
+        // No persistence so scheduled jobs don't survive restart.
+    }
+
+    @Override
+    public void testJobSchedulerStoreUsage() throws Exception {
+        // No store usage numbers for in-memory store.
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJmsCronSchedulerTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJmsCronSchedulerTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJmsCronSchedulerTest.java
new file mode 100644
index 0000000..a3b7d04
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJmsCronSchedulerTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler.memory;
+
+import org.apache.activemq.broker.scheduler.JmsCronSchedulerTest;
+
+/**
+ * In memory version of the cron scheduler test.
+ */
+public class InMemoryJmsCronSchedulerTest extends JmsCronSchedulerTest {
+
+    @Override
+    protected boolean isPersistent() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerJmxManagementTests.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerJmxManagementTests.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerJmxManagementTests.java
new file mode 100644
index 0000000..46f5540
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerJmxManagementTests.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler.memory;
+
+import org.apache.activemq.broker.scheduler.JobSchedulerJmxManagementTests;
+
+/**
+ * Test for the In-Memory scheduler's JMX management features.
+ */
+public class InMemoryJobSchedulerJmxManagementTests extends 
JobSchedulerJmxManagementTests {
+
+    @Override
+    protected boolean isPersistent() {
+        return false;
+    }
+}

Reply via email to