http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyStoreReplayer.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyStoreReplayer.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyStoreReplayer.java new file mode 100644 index 0000000..92563f4 --- /dev/null +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyStoreReplayer.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb.scheduler.legacy; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Set; + +import org.apache.activemq.store.kahadb.data.KahaAddScheduledJobCommand; +import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Used to upgrade a Legacy Job Scheduler store to the latest version this class + * loads a found legacy scheduler store and generates new add commands for all + * jobs currently in the store. + */ +public class LegacyStoreReplayer { + + static final Logger LOG = LoggerFactory.getLogger(LegacyStoreReplayer.class); + + private LegacyJobSchedulerStoreImpl store; + private final File legacyStoreDirectory; + + /** + * Creates a new Legacy Store Replayer with the given target store + * @param targetStore + * @param directory + */ + public LegacyStoreReplayer(File directory) { + this.legacyStoreDirectory = directory; + } + + /** + * Loads the legacy store and prepares it for replay into a newer Store instance. + * + * @throws IOException if an error occurs while reading in the legacy store. + */ + public void load() throws IOException { + + store = new LegacyJobSchedulerStoreImpl(); + store.setDirectory(legacyStoreDirectory); + store.setFailIfDatabaseIsLocked(true); + + try { + store.start(); + } catch (IOException ioe) { + LOG.warn("Legacy store load failed: ", ioe); + throw ioe; + } catch (Exception e) { + LOG.warn("Legacy store load failed: ", e); + throw new IOException(e); + } + } + + /** + * Unloads a previously loaded legacy store to release any resources associated with it. + * + * Once a store is unloaded it cannot be replayed again until it has been reloaded. + * @throws IOException + */ + public void unload() throws IOException { + + if (store != null) { + try { + store.stop(); + } catch (Exception e) { + LOG.warn("Legacy store unload failed: ", e); + throw new IOException(e); + } finally { + store = null; + } + } + } + + /** + * Performs a replay of scheduled jobs into the target JobSchedulerStore. + * + * @param targetStore + * The JobSchedulerStore that will receive the replay events from the legacy store. + * + * @throws IOException if an error occurs during replay of the legacy store. + */ + public void startReplay(JobSchedulerStoreImpl targetStore) throws IOException { + checkLoaded(); + + if (targetStore == null) { + throw new IOException("Cannot replay to a null store"); + } + + try { + Set<String> schedulers = store.getJobSchedulerNames(); + if (!schedulers.isEmpty()) { + + for (String name : schedulers) { + LegacyJobSchedulerImpl scheduler = store.getJobScheduler(name); + LOG.info("Replay of legacy store {} starting.", name); + replayScheduler(scheduler, targetStore); + } + } + + LOG.info("Replay of legacy store complate."); + } catch (IOException ioe) { + LOG.warn("Failed during replay of legacy store: ", ioe); + throw ioe; + } catch (Exception e) { + LOG.warn("Failed during replay of legacy store: ", e); + throw new IOException(e); + } + } + + private final void replayScheduler(LegacyJobSchedulerImpl legacy, JobSchedulerStoreImpl target) throws Exception { + List<LegacyJobImpl> jobs = legacy.getAllJobs(); + + String schedulerName = legacy.getName(); + + for (LegacyJobImpl job : jobs) { + LOG.trace("Storing job from legacy store to new store: {}", job); + KahaAddScheduledJobCommand newJob = new KahaAddScheduledJobCommand(); + newJob.setScheduler(schedulerName); + newJob.setJobId(job.getJobId()); + newJob.setStartTime(job.getStartTime()); + newJob.setCronEntry(job.getCronEntry()); + newJob.setDelay(job.getDelay()); + newJob.setPeriod(job.getPeriod()); + newJob.setRepeat(job.getRepeat()); + newJob.setNextExecutionTime(job.getNextExecutionTime()); + newJob.setPayload(job.getPayload()); + + target.store(newJob); + } + } + + private final void checkLoaded() throws IOException { + if (this.store == null) { + throw new IOException("Cannot replay until legacy store is loaded."); + } + } +}
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/proto/journal-data.proto ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/proto/journal-data.proto b/activemq-kahadb-store/src/main/proto/journal-data.proto index 8290c4c..01607a5 100644 --- a/activemq-kahadb-store/src/main/proto/journal-data.proto +++ b/activemq-kahadb-store/src/main/proto/journal-data.proto @@ -32,6 +32,11 @@ enum KahaEntryType { KAHA_PRODUCER_AUDIT_COMMAND = 8; KAHA_ACK_MESSAGE_FILE_MAP_COMMAND = 9; KAHA_UPDATE_MESSAGE_COMMAND = 10; + KAHA_ADD_SCHEDULED_JOB_COMMAND = 11; + KAHA_RESCHEDULE_JOB_COMMAND = 12; + KAHA_REMOVE_SCHEDULED_JOB_COMMAND = 13; + KAHA_REMOVE_SCHEDULED_JOBS_COMMAND = 14; + KAHA_DESTROY_SCHEDULER_COMMAND = 15; } message KahaTraceCommand { @@ -179,6 +184,62 @@ message KahaLocation { required int32 offset = 2; } +message KahaAddScheduledJobCommand { + //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaAddScheduledJobCommand>"; + //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; + //| option java_type_method = "KahaEntryType"; + + required string scheduler=1; + required string job_id=2; + required int64 start_time=3; + required string cron_entry=4; + required int64 delay=5; + required int64 period=6; + required int32 repeat=7; + required bytes payload=8; + required int64 next_execution_time=9; +} + +message KahaRescheduleJobCommand { + //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaRescheduleJobCommand>"; + //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; + //| option java_type_method = "KahaEntryType"; + + required string scheduler=1; + required string job_id=2; + required int64 execution_time=3; + required int64 next_execution_time=4; + required int32 rescheduled_count=5; +} + +message KahaRemoveScheduledJobCommand { + //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaRemoveScheduledJobCommand>"; + //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; + //| option java_type_method = "KahaEntryType"; + + required string scheduler=1; + required string job_id=2; + required int64 next_execution_time=3; +} + +message KahaRemoveScheduledJobsCommand { + //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaRemoveScheduledJobsCommand>"; + //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; + //| option java_type_method = "KahaEntryType"; + + required string scheduler=1; + required int64 start_time=2; + required int64 end_time=3; +} + +message KahaDestroySchedulerCommand { + //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaDestroySchedulerCommand>"; + //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; + //| option java_type_method = "KahaEntryType"; + + required string scheduler=1; +} + // TODO things to ponder // should we move more message fields // that are set by the sender (and rarely required by the broker http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index 42b25c6..01d5170 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -35,6 +35,7 @@ import org.apache.activemq.leveldb.util.Log import org.apache.activemq.store.PList.PListIterator import org.fusesource.hawtbuf.{UTF8Buffer, DataByteArrayOutputStream} import org.fusesource.hawtdispatch; +import org.apache.activemq.broker.scheduler.JobSchedulerStore; object LevelDBStore extends Log { val DEFAULT_DIRECTORY = new File("LevelDB"); @@ -604,6 +605,10 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P rc } + def createJobSchedulerStore():JobSchedulerStore = { + throw new UnsupportedOperationException(); + } + def removeTopicMessageStore(destination: ActiveMQTopic): Unit = { topics.remove(destination).foreach { store=> store.subscriptions.values.foreach { sub => http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala index bd3904f..efd55f3 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala @@ -25,6 +25,7 @@ import java.io.File import java.io.IOException import java.util.Set import org.apache.activemq.util.{ServiceStopper, ServiceSupport} +import org.apache.activemq.broker.scheduler.JobSchedulerStore /** */ @@ -44,6 +45,10 @@ abstract class ProxyLevelDBStore extends LockableServiceSupport with BrokerServi return proxy_target.createTopicMessageStore(destination) } + def createJobSchedulerStore():JobSchedulerStore = { + return proxy_target.createJobSchedulerStore() + } + def setDirectory(dir: File) { proxy_target.setDirectory(dir) } http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java index 04277bf..641ee97 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java @@ -39,6 +39,7 @@ public class JobSchedulerBrokerShutdownTest extends EmbeddedBrokerTestSupport { BrokerService broker = super.createBroker(); broker.setSchedulerSupport(true); + broker.setDataDirectory("target"); broker.setSchedulerDirectoryFile(schedulerDirectory); broker.getSystemUsage().getStoreUsage().setLimit(1 * 512); broker.deleteAllMessages(); http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerJmxManagementTests.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerJmxManagementTests.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerJmxManagementTests.java new file mode 100644 index 0000000..8adb980 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerJmxManagementTests.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker.scheduler; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.List; + +import javax.jms.Connection; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.management.openmbean.TabularData; + +import org.apache.activemq.ScheduledMessage; +import org.apache.activemq.broker.jmx.JobSchedulerViewMBean; +import org.apache.activemq.util.Wait; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests of the JMX JobSchedulerStore management MBean. + */ +public class JobSchedulerJmxManagementTests extends JobSchedulerTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerJmxManagementTests.class); + + @Test + public void testJobSchedulerMBeanIsRegistered() throws Exception { + JobSchedulerViewMBean view = getJobSchedulerMBean(); + assertNotNull(view); + assertTrue(view.getAllJobs().isEmpty()); + } + + @Test + public void testGetNumberOfJobs() throws Exception { + JobSchedulerViewMBean view = getJobSchedulerMBean(); + assertNotNull(view); + assertTrue(view.getAllJobs().isEmpty()); + scheduleMessage(60000, -1, -1); + assertFalse(view.getAllJobs().isEmpty()); + assertEquals(1, view.getAllJobs().size()); + scheduleMessage(60000, -1, -1); + assertEquals(2, view.getAllJobs().size()); + } + + @Test + public void testRemvoeJob() throws Exception { + JobSchedulerViewMBean view = getJobSchedulerMBean(); + assertNotNull(view); + assertTrue(view.getAllJobs().isEmpty()); + scheduleMessage(60000, -1, -1); + assertFalse(view.getAllJobs().isEmpty()); + TabularData jobs = view.getAllJobs(); + assertEquals(1, jobs.size()); + for (Object key : jobs.keySet()) { + String jobId = ((List<?>)key).get(0).toString(); + LOG.info("Attempting to remove Job: {}", jobId); + view.removeJob(jobId); + } + assertTrue(view.getAllJobs().isEmpty()); + } + + @Test + public void testRemvoeJobInRange() throws Exception { + JobSchedulerViewMBean view = getJobSchedulerMBean(); + assertNotNull(view); + assertTrue(view.getAllJobs().isEmpty()); + scheduleMessage(60000, -1, -1); + assertFalse(view.getAllJobs().isEmpty()); + String now = JobSupport.getDateTime(System.currentTimeMillis()); + String later = JobSupport.getDateTime(System.currentTimeMillis() + 120 * 1000); + view.removeAllJobs(now, later); + assertTrue(view.getAllJobs().isEmpty()); + } + + @Test + public void testGetNextScheduledJob() throws Exception { + JobSchedulerViewMBean view = getJobSchedulerMBean(); + assertNotNull(view); + assertTrue(view.getAllJobs().isEmpty()); + scheduleMessage(60000, -1, -1); + assertFalse(view.getAllJobs().isEmpty()); + long before = System.currentTimeMillis() + 57 * 1000; + long toLate = System.currentTimeMillis() + 63 * 1000; + String next = view.getNextScheduleTime(); + long nextTime = JobSupport.getDataTime(next); + LOG.info("Next Scheduled Time: {}", next); + assertTrue(nextTime > before); + assertTrue(nextTime < toLate); + } + + @Test + public void testGetExecutionCount() throws Exception { + final JobSchedulerViewMBean view = getJobSchedulerMBean(); + assertNotNull(view); + assertTrue(view.getAllJobs().isEmpty()); + scheduleMessage(10000, 1000, 10); + assertFalse(view.getAllJobs().isEmpty()); + TabularData jobs = view.getAllJobs(); + assertEquals(1, jobs.size()); + String jobId = null; + for (Object key : jobs.keySet()) { + jobId = ((List<?>)key).get(0).toString(); + } + + final String fixedJobId = jobId; + LOG.info("Attempting to get execution count for Job: {}", jobId); + assertEquals(0, view.getExecutionCount(jobId)); + + assertTrue("Should execute again", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return view.getExecutionCount(fixedJobId) > 0; + } + })); + } + + @Override + protected boolean isUseJmx() { + return true; + } + + protected void scheduleMessage(int time, int period, int repeat) throws Exception { + Connection connection = createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + TextMessage message = session.createTextMessage("test msg"); + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period); + message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat); + producer.send(message); + connection.close(); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java index bc89d9e..c82f8ef 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java @@ -16,7 +16,11 @@ */ package org.apache.activemq.broker.scheduler; -import java.io.File; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -29,18 +33,17 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; -import org.apache.activemq.EmbeddedBrokerTestSupport; import org.apache.activemq.ScheduledMessage; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.IdGenerator; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport { +public class JobSchedulerManagementTest extends JobSchedulerTestSupport { private static final transient Logger LOG = LoggerFactory.getLogger(JobSchedulerManagementTest.class); + @Test public void testRemoveAllScheduled() throws Exception { final int COUNT = 5; Connection connection = createConnection(); @@ -77,6 +80,7 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport { assertEquals(latch.getCount(), COUNT); } + @Test public void testRemoveAllScheduledAtTime() throws Exception { final int COUNT = 3; Connection connection = createConnection(); @@ -122,8 +126,7 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport { // Send the remove request MessageProducer producer = session.createProducer(management); Message request = session.createMessage(); - request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, - ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL); + request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL); request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME, Long.toString(start)); request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME, Long.toString(end)); producer.send(request); @@ -143,6 +146,7 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport { assertEquals(2, latch.getCount()); } + @Test public void testBrowseAllScheduled() throws Exception { final int COUNT = 10; Connection connection = createConnection(); @@ -191,7 +195,8 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport { Thread.sleep(2000); assertEquals(latch.getCount(), COUNT); - // now see if we got all the scheduled messages on the browse destination. + // now see if we got all the scheduled messages on the browse + // destination. latch.await(10, TimeUnit.SECONDS); assertEquals(browsedLatch.getCount(), 0); @@ -200,6 +205,7 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport { assertEquals(latch.getCount(), 0); } + @Test public void testBrowseWindowlScheduled() throws Exception { final int COUNT = 10; Connection connection = createConnection(); @@ -255,15 +261,18 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport { Thread.sleep(2000); assertEquals(COUNT + 2, latch.getCount()); - // now see if we got all the scheduled messages on the browse destination. + // now see if we got all the scheduled messages on the browse + // destination. latch.await(15, TimeUnit.SECONDS); assertEquals(0, browsedLatch.getCount()); - // now see if we got all the scheduled messages on the browse destination. + // now see if we got all the scheduled messages on the browse + // destination. latch.await(20, TimeUnit.SECONDS); assertEquals(0, latch.getCount()); } + @Test public void testRemoveScheduled() throws Exception { final int COUNT = 10; Connection connection = createConnection(); @@ -297,8 +306,7 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport { // Send the browse request Message request = session.createMessage(); - request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, - ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE); + request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE); request.setJMSReplyTo(browseDest); producer.send(request); @@ -307,14 +315,12 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport { Message message = browser.receive(2000); assertNotNull(message); - try{ + try { Message remove = session.createMessage(); - remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, - ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE); - remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID, - message.getStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID)); + remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE); + remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID, message.getStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID)); producer.send(remove); - } catch(Exception e) { + } catch (Exception e) { } } @@ -323,6 +329,7 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport { assertEquals(COUNT, latch.getCount()); } + @Test public void testRemoveNotScheduled() throws Exception { Connection connection = createConnection(); @@ -333,19 +340,19 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport { MessageProducer producer = session.createProducer(management); - try{ + try { // Send the remove request Message remove = session.createMessage(); - remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, - ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL); + remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL); remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID, new IdGenerator().generateId()); producer.send(remove); - } catch(Exception e) { + } catch (Exception e) { fail("Caught unexpected exception during remove of unscheduled message."); } } + @Test public void testBrowseWithSelector() throws Exception { Connection connection = createConnection(); @@ -362,7 +369,7 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport { Destination browseDest = session.createTemporaryTopic(); // Create the "Browser" - MessageConsumer browser = session.createConsumer(browseDest, ScheduledMessage.AMQ_SCHEDULED_DELAY + " = 45000" ); + MessageConsumer browser = session.createConsumer(browseDest, ScheduledMessage.AMQ_SCHEDULED_DELAY + " = 45000"); connection.start(); @@ -383,7 +390,6 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport { assertNull(message); } - protected void scheduleMessage(Connection connection, long delay) throws Exception { scheduleMessage(connection, delay, 1); } @@ -394,38 +400,10 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport { TextMessage message = session.createTextMessage("test msg"); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); - for(int i = 0; i < count; ++i ) { + for (int i = 0; i < count; ++i) { producer.send(message); } producer.close(); } - - @Override - protected void setUp() throws Exception { - bindAddress = "vm://localhost"; - super.setUp(); - } - - @Override - protected BrokerService createBroker() throws Exception { - return createBroker(true); - } - - protected BrokerService createBroker(boolean delete) throws Exception { - File schedulerDirectory = new File("target/scheduler"); - if (delete) { - IOHelper.mkdirs(schedulerDirectory); - IOHelper.deleteChildren(schedulerDirectory); - } - BrokerService answer = new BrokerService(); - answer.setPersistent(true); - answer.setDeleteAllMessagesOnStartup(true); - answer.setDataDirectory("target"); - answer.setSchedulerDirectoryFile(schedulerDirectory); - answer.setSchedulerSupport(true); - answer.setUseJmx(false); - answer.addConnector(bindAddress); - return answer; - } } http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java new file mode 100644 index 0000000..c013a4c --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker.scheduler; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.IOHelper; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JobSchedulerStoreCheckpointTest { + + static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreCheckpointTest.class); + + private JobSchedulerStoreImpl store; + private JobScheduler scheduler; + private ByteSequence payload; + + @Before + public void setUp() throws Exception { + File directory = new File("target/test/ScheduledJobsDB"); + IOHelper.mkdirs(directory); + IOHelper.deleteChildren(directory); + startStore(directory); + + byte[] data = new byte[8192]; + for (int i = 0; i < data.length; ++i) { + data[i] = (byte) (i % 256); + } + + payload = new ByteSequence(data); + } + + protected void startStore(File directory) throws Exception { + store = new JobSchedulerStoreImpl(); + store.setDirectory(directory); + store.setCheckpointInterval(5000); + store.setCleanupInterval(10000); + store.setJournalMaxFileLength(10 * 1024); + store.start(); + scheduler = store.getJobScheduler("test"); + scheduler.startDispatching(); + } + + private int getNumJournalFiles() throws IOException { + return store.getJournal().getFileMap().size(); + } + + @After + public void tearDown() throws Exception { + scheduler.stopDispatching(); + store.stop(); + } + + @Test + public void test() throws Exception { + final int COUNT = 10; + final CountDownLatch latch = new CountDownLatch(COUNT); + scheduler.addListener(new JobListener() { + @Override + public void scheduledJob(String id, ByteSequence job) { + latch.countDown(); + } + }); + + long time = TimeUnit.SECONDS.toMillis(30); + for (int i = 0; i < COUNT; i++) { + scheduler.schedule("id" + i, payload, "", time, 0, 0); + } + + int size = scheduler.getAllJobs().size(); + assertEquals(size, COUNT); + + LOG.info("Number of journal log files: {}", getNumJournalFiles()); + // need a little slack so go over 60 seconds + assertTrue(latch.await(70, TimeUnit.SECONDS)); + assertEquals(0, latch.getCount()); + + for (int i = 0; i < COUNT; i++) { + scheduler.schedule("id" + i, payload, "", time, 0, 0); + } + + LOG.info("Number of journal log files: {}", getNumJournalFiles()); + // need a little slack so go over 60 seconds + assertTrue(latch.await(70, TimeUnit.SECONDS)); + assertEquals(0, latch.getCount()); + + assertTrue("Should be only one log left: " + getNumJournalFiles(), Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getNumJournalFiles() == 1; + } + }, TimeUnit.MINUTES.toMillis(2))); + + LOG.info("Number of journal log files: {}", getNumJournalFiles()); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java index 0e0c1d7..df1e7ff 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java @@ -16,50 +16,62 @@ */ package org.apache.activemq.broker.scheduler; +import static org.junit.Assert.assertEquals; + import java.io.File; import java.util.ArrayList; import java.util.List; -import junit.framework.TestCase; - import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.IOHelper; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JobSchedulerStoreTest { -public class JobSchedulerStoreTest extends TestCase { + private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreTest.class); + @Test(timeout = 120 * 1000) public void testRestart() throws Exception { JobSchedulerStore store = new JobSchedulerStoreImpl(); File directory = new File("target/test/ScheduledDB"); - IOHelper.mkdirs(directory); - IOHelper.deleteChildren(directory); - store.setDirectory(directory); + IOHelper.mkdirs(directory); + IOHelper.deleteChildren(directory); + store.setDirectory(directory); final int NUMBER = 1000; store.start(); - List<ByteSequence>list = new ArrayList<ByteSequence>(); - for (int i = 0; i < NUMBER;i++ ) { - ByteSequence buff = new ByteSequence(new String("testjob"+i).getBytes()); + List<ByteSequence> list = new ArrayList<ByteSequence>(); + for (int i = 0; i < NUMBER; i++) { + ByteSequence buff = new ByteSequence(new String("testjob" + i).getBytes()); list.add(buff); } + JobScheduler js = store.getJobScheduler("test"); js.startDispatching(); int count = 0; - long startTime = 10 * 60 * 1000; long period = startTime; - for (ByteSequence job:list) { - js.schedule("id:"+(count++), job, "", startTime, period, -1); + long startTime = 10 * 60 * 1000; + long period = startTime; + for (ByteSequence job : list) { + js.schedule("id:" + (count++), job, "", startTime, period, -1); } - List<Job>test = js.getAllJobs(); - assertEquals(list.size(),test.size()); + + List<Job> test = js.getAllJobs(); + LOG.debug("Found {} jobs in the store before restart", test.size()); + assertEquals(list.size(), test.size()); store.stop(); store.start(); js = store.getJobScheduler("test"); test = js.getAllJobs(); - assertEquals(list.size(),test.size()); - for (int i = 0; i < list.size();i++) { + LOG.debug("Found {} jobs in the store after restart", test.size()); + assertEquals(list.size(), test.size()); + + for (int i = 0; i < list.size(); i++) { String orig = new String(list.get(i).getData()); String payload = new String(test.get(i).getPayload()); - assertEquals(orig,payload); + assertEquals(orig, payload); } } } http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java index 5126970..2210eba 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java @@ -31,8 +31,13 @@ import org.apache.activemq.util.IOHelper; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class JobSchedulerTest { + + private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerTest.class); + private JobSchedulerStore store; private JobScheduler scheduler; @@ -173,6 +178,37 @@ public class JobSchedulerTest { } @Test + public void testGetExecutionCount() throws Exception { + final String jobId = "Job-1"; + long time = 10000; + final CountDownLatch done = new CountDownLatch(10); + + String str = new String("test"); + scheduler.schedule(jobId, new ByteSequence(str.getBytes()), "", time, 1000, 10); + + int size = scheduler.getAllJobs().size(); + assertEquals(size, 1); + + scheduler.addListener(new JobListener() { + @Override + public void scheduledJob(String id, ByteSequence job) { + LOG.info("Job exectued: {}", 11 - done.getCount()); + done.countDown(); + } + }); + + List<Job> jobs = scheduler.getNextScheduleJobs(); + assertEquals(1, jobs.size()); + Job job = jobs.get(0); + assertEquals(jobId, job.getJobId()); + assertEquals(0, job.getExecutionCount()); + assertTrue("Should have fired ten times.", done.await(60, TimeUnit.SECONDS)); + // The job is not updated on the last firing as it is removed from the store following + // it's last execution so the count will always be one less than the max firings. + assertTrue(job.getExecutionCount() >= 9); + } + + @Test public void testgetAllJobs() throws Exception { final int COUNT = 10; final String ID = "id:"; http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java new file mode 100644 index 0000000..2b25797 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker.scheduler; + +import java.io.File; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Queue; +import javax.management.ObjectName; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.JobSchedulerViewMBean; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.util.IOHelper; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.TestName; + +/** + * Base class for tests of the Broker's JobSchedulerStore. + */ +public class JobSchedulerTestSupport { + + @Rule public TestName name = new TestName(); + + protected String connectionUri; + protected BrokerService broker; + protected JobScheduler jobScheduler; + protected Queue destination; + + @Before + public void setUp() throws Exception { + connectionUri = "vm://localhost"; + destination = new ActiveMQQueue(name.getMethodName()); + + broker = createBroker(); + broker.start(); + broker.waitUntilStarted(); + + jobScheduler = broker.getJobSchedulerStore().getJobScheduler("JMS"); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + + protected Connection createConnection() throws Exception { + return createConnectionFactory().createConnection(); + } + + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(connectionUri); + } + + protected BrokerService createBroker() throws Exception { + return createBroker(true); + } + + protected boolean isUseJmx() { + return false; + } + + protected JobSchedulerViewMBean getJobSchedulerMBean() throws Exception { + ObjectName objectName = broker.getAdminView().getJMSJobScheduler(); + JobSchedulerViewMBean scheduler = null; + if (objectName != null) { + scheduler = (JobSchedulerViewMBean) broker.getManagementContext() + .newProxyInstance(objectName, JobSchedulerViewMBean.class, true); + } + + return scheduler; + } + + protected BrokerService createBroker(boolean delete) throws Exception { + File schedulerDirectory = new File("target/scheduler"); + if (delete) { + IOHelper.mkdirs(schedulerDirectory); + IOHelper.deleteChildren(schedulerDirectory); + } + + BrokerService answer = new BrokerService(); + answer.setPersistent(true); + answer.setDeleteAllMessagesOnStartup(true); + answer.setDataDirectory("target"); + answer.setSchedulerDirectoryFile(schedulerDirectory); + answer.setSchedulerSupport(true); + answer.setUseJmx(isUseJmx()); + return answer; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerIndexRebuildTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerIndexRebuildTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerIndexRebuildTest.java new file mode 100644 index 0000000..9d0fef2 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerIndexRebuildTest.java @@ -0,0 +1,179 @@ +package org.apache.activemq.broker.scheduler; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.security.ProtectionDomain; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ScheduledMessage; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; +import org.apache.activemq.util.IOHelper; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KahaDBSchedulerIndexRebuildTest { + + static final Logger LOG = LoggerFactory.getLogger(KahaDBSchedulerIndexRebuildTest.class); + + private BrokerService broker = null; + private final int NUM_JOBS = 50; + + static String basedir; + static { + try { + ProtectionDomain protectionDomain = SchedulerDBVersionTest.class.getProtectionDomain(); + basedir = new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../.").getCanonicalPath(); + } catch (IOException e) { + basedir = "."; + } + } + + private final File schedulerStoreDir = new File(basedir, "activemq-data/store/scheduler"); + private final File storeDir = new File(basedir, "activemq-data/store/"); + + @Before + public void setUp() throws Exception { + LOG.info("Test Dir = {}", schedulerStoreDir); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + } + } + + @Test + public void testIndexRebuilds() throws Exception { + IOHelper.deleteFile(schedulerStoreDir); + + JobSchedulerStoreImpl schedulerStore = createScheduler(); + broker = createBroker(schedulerStore); + broker.start(); + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); + Connection connection = cf.createConnection(); + connection.start(); + for (int i = 0; i < NUM_JOBS; ++i) { + scheduleRepeating(connection); + } + connection.close(); + + JobScheduler scheduler = schedulerStore.getJobScheduler("JMS"); + assertNotNull(scheduler); + assertEquals(NUM_JOBS, scheduler.getAllJobs().size()); + + broker.stop(); + + IOHelper.delete(new File(schedulerStoreDir, "scheduleDB.data")); + + schedulerStore = createScheduler(); + broker = createBroker(schedulerStore); + broker.start(); + + scheduler = schedulerStore.getJobScheduler("JMS"); + assertNotNull(scheduler); + assertEquals(NUM_JOBS, scheduler.getAllJobs().size()); + } + + @Test + public void testIndexRebuildsAfterSomeJobsExpire() throws Exception { + IOHelper.deleteFile(schedulerStoreDir); + + JobSchedulerStoreImpl schedulerStore = createScheduler(); + broker = createBroker(schedulerStore); + broker.start(); + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); + Connection connection = cf.createConnection(); + connection.start(); + for (int i = 0; i < NUM_JOBS; ++i) { + scheduleRepeating(connection); + scheduleOneShot(connection); + } + connection.close(); + + JobScheduler scheduler = schedulerStore.getJobScheduler("JMS"); + assertNotNull(scheduler); + assertEquals(NUM_JOBS * 2, scheduler.getAllJobs().size()); + + final JobScheduler awaitingOneShotTimeout = scheduler; + assertTrue("One shot jobs should time out", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return awaitingOneShotTimeout.getAllJobs().size() == NUM_JOBS; + } + }, TimeUnit.MINUTES.toMillis(2))); + + broker.stop(); + + IOHelper.delete(new File(schedulerStoreDir, "scheduleDB.data")); + + schedulerStore = createScheduler(); + broker = createBroker(schedulerStore); + broker.start(); + + scheduler = schedulerStore.getJobScheduler("JMS"); + assertNotNull(scheduler); + assertEquals(NUM_JOBS, scheduler.getAllJobs().size()); + } + + private void scheduleRepeating(Connection connection) throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("test.queue"); + MessageProducer producer = session.createProducer(queue); + + TextMessage message = session.createTextMessage("test msg"); + long time = 360 * 1000; + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 500); + message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, -1); + producer.send(message); + producer.close(); + } + + private void scheduleOneShot(Connection connection) throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("test.queue"); + MessageProducer producer = session.createProducer(queue); + + TextMessage message = session.createTextMessage("test msg"); + long time = TimeUnit.SECONDS.toMillis(30); + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); + message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 0); + producer.send(message); + producer.close(); + } + + protected JobSchedulerStoreImpl createScheduler() { + JobSchedulerStoreImpl scheduler = new JobSchedulerStoreImpl(); + scheduler.setDirectory(schedulerStoreDir); + scheduler.setJournalMaxFileLength(10 * 1024); + return scheduler; + } + + protected BrokerService createBroker(JobSchedulerStoreImpl scheduler) throws Exception { + BrokerService answer = new BrokerService(); + answer.setJobSchedulerStore(scheduler); + answer.setPersistent(true); + answer.setDataDirectory(storeDir.getAbsolutePath()); + answer.setSchedulerSupport(true); + answer.setUseJmx(false); + return answer; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerMissingJournalLogsTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerMissingJournalLogsTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerMissingJournalLogsTest.java new file mode 100644 index 0000000..30da10d --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerMissingJournalLogsTest.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker.scheduler; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.security.ProtectionDomain; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ScheduledMessage; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.kahadb.disk.journal.DataFile; +import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; +import org.apache.activemq.util.IOHelper; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + *Test that the store recovers even if some log files are missing. + */ +public class KahaDBSchedulerMissingJournalLogsTest { + + static final Logger LOG = LoggerFactory.getLogger(KahaDBSchedulerIndexRebuildTest.class); + + private BrokerService broker = null; + private JobSchedulerStoreImpl schedulerStore = null; + + private final int NUM_LOGS = 6; + + static String basedir; + static { + try { + ProtectionDomain protectionDomain = SchedulerDBVersionTest.class.getProtectionDomain(); + basedir = new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../.").getCanonicalPath(); + } catch (IOException e) { + basedir = "."; + } + } + + private final File schedulerStoreDir = new File(basedir, "activemq-data/store/scheduler"); + private final File storeDir = new File(basedir, "activemq-data/store/"); + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + IOHelper.deleteFile(schedulerStoreDir); + LOG.info("Test Dir = {}", schedulerStoreDir); + + createBroker(); + broker.start(); + broker.waitUntilStarted(); + + schedulerStore = (JobSchedulerStoreImpl) broker.getJobSchedulerStore(); + } + + /** + * @throws java.lang.Exception + */ + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + + @Test(timeout=120 * 1000) + public void testMissingLogsCausesBrokerToFail() throws Exception { + fillUpSomeLogFiles(); + + int jobCount = schedulerStore.getJobScheduler("JMS").getAllJobs().size(); + LOG.info("There are {} jobs in the store.", jobCount); + + List<File> toDelete = new ArrayList<File>(); + Map<Integer, DataFile> files = schedulerStore.getJournal().getFileMap(); + for (int i = files.size(); i > files.size() / 2; i--) { + toDelete.add(files.get(i).getFile()); + } + + broker.stop(); + broker.waitUntilStopped(); + + for (File file : toDelete) { + LOG.info("File to delete: {}", file); + IOHelper.delete(file); + } + + try { + createBroker(); + fail("Should not start when logs are missing."); + } catch (Exception e) { + } + } + + @Test(timeout=120 * 1000) + public void testRecoverWhenSomeLogsAreMissing() throws Exception { + fillUpSomeLogFiles(); + + int jobCount = schedulerStore.getJobScheduler("JMS").getAllJobs().size(); + LOG.info("There are {} jobs in the store.", jobCount); + + List<File> toDelete = new ArrayList<File>(); + Map<Integer, DataFile> files = schedulerStore.getJournal().getFileMap(); + for (int i = files.size() - 1; i > files.size() / 2; i--) { + toDelete.add(files.get(i).getFile()); + } + + broker.stop(); + broker.waitUntilStopped(); + + for (File file : toDelete) { + LOG.info("File to delete: {}", file); + IOHelper.delete(file); + } + + schedulerStore = createScheduler(); + schedulerStore.setIgnoreMissingJournalfiles(true); + + createBroker(schedulerStore); + broker.start(); + broker.waitUntilStarted(); + + int postRecoverJobCount = schedulerStore.getJobScheduler("JMS").getAllJobs().size(); + assertTrue(postRecoverJobCount > 0); + assertTrue(postRecoverJobCount < jobCount); + } + + private void fillUpSomeLogFiles() throws Exception { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); + Connection connection = cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("test.queue"); + MessageProducer producer = session.createProducer(queue); + connection.start(); + while (true) { + scheduleRepeating(session, producer); + if (schedulerStore.getJournal().getFileMap().size() == NUM_LOGS) { + break; + } + } + connection.close(); + } + + private void scheduleRepeating(Session session, MessageProducer producer) throws Exception { + TextMessage message = session.createTextMessage("test msg"); + long time = 360 * 1000; + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 500); + message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, -1); + producer.send(message); + } + + protected JobSchedulerStoreImpl createScheduler() { + JobSchedulerStoreImpl scheduler = new JobSchedulerStoreImpl(); + scheduler.setDirectory(schedulerStoreDir); + scheduler.setJournalMaxFileLength(10 * 1024); + return scheduler; + } + + protected void createBroker() throws Exception { + createBroker(createScheduler()); + } + + protected void createBroker(JobSchedulerStoreImpl scheduler) throws Exception { + broker = new BrokerService(); + broker.setJobSchedulerStore(scheduler); + broker.setPersistent(true); + broker.setDataDirectory(storeDir.getAbsolutePath()); + broker.setSchedulerSupport(true); + broker.setUseJmx(false); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/SchedulerDBVersionTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/SchedulerDBVersionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/SchedulerDBVersionTest.java new file mode 100644 index 0000000..721f417 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/SchedulerDBVersionTest.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.scheduler; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.security.ProtectionDomain; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ScheduledMessage; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; +import org.apache.activemq.util.IOHelper; +import org.junit.After; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SchedulerDBVersionTest { + static String basedir; + static { + try { + ProtectionDomain protectionDomain = SchedulerDBVersionTest.class.getProtectionDomain(); + basedir = new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalPath(); + } catch (IOException e) { + basedir = "."; + } + } + + static final Logger LOG = LoggerFactory.getLogger(SchedulerDBVersionTest.class); + final static File VERSION_LEGACY_JMS = + new File(basedir + "/src/test/resources/org/apache/activemq/store/schedulerDB/legacy"); + + BrokerService broker = null; + + protected BrokerService createBroker(JobSchedulerStoreImpl scheduler) throws Exception { + BrokerService answer = new BrokerService(); + answer.setJobSchedulerStore(scheduler); + answer.setPersistent(true); + answer.setDataDirectory("target"); + answer.setSchedulerSupport(true); + answer.setUseJmx(false); + return answer; + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + } + } + + @Ignore("Used only when a new version of the store needs to archive it's test data.") + @Test + public void testCreateStore() throws Exception { + JobSchedulerStoreImpl scheduler = new JobSchedulerStoreImpl(); + File dir = new File("src/test/resources/org/apache/activemq/store/schedulerDB/legacy"); + IOHelper.deleteFile(dir); + scheduler.setDirectory(dir); + scheduler.setJournalMaxFileLength(1024 * 1024); + broker = createBroker(scheduler); + broker.start(); + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); + Connection connection = cf.createConnection(); + connection.start(); + scheduleRepeating(connection); + connection.close(); + broker.stop(); + } + + private void scheduleRepeating(Connection connection) throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("test.queue"); + MessageProducer producer = session.createProducer(queue); + + TextMessage message = session.createTextMessage("test msg"); + long time = 1000; + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 500); + message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, -1); + producer.send(message); + producer.close(); + } + + @Test + public void testLegacyStoreConversion() throws Exception { + doTestScheduleRepeated(VERSION_LEGACY_JMS); + } + + public void doTestScheduleRepeated(File existingStore) throws Exception { + File testDir = new File("target/activemq-data/store/scheduler/versionDB"); + IOHelper.deleteFile(testDir); + IOHelper.copyFile(existingStore, testDir); + + final int NUMBER = 10; + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); + + for (int i = 0; i < 3; ++i) { + JobSchedulerStoreImpl scheduler = new JobSchedulerStoreImpl(); + scheduler.setDirectory(testDir); + scheduler.setJournalMaxFileLength(1024 * 1024); + BrokerService broker = createBroker(scheduler); + broker.start(); + broker.waitUntilStarted(); + + final AtomicInteger count = new AtomicInteger(); + Connection connection = cf.createConnection(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("test.queue"); + + MessageConsumer consumer = session.createConsumer(queue); + + final CountDownLatch latch = new CountDownLatch(NUMBER); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + LOG.info("Received scheduled message: {}", message); + latch.countDown(); + count.incrementAndGet(); + } + }); + + connection.start(); + assertEquals(latch.getCount(), NUMBER); + latch.await(30, TimeUnit.SECONDS); + + connection.close(); + broker.stop(); + broker.waitUntilStopped(); + + assertEquals(0, latch.getCount()); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-unit-tests/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/resources/log4j.properties b/activemq-unit-tests/src/test/resources/log4j.properties index 564ed9e..85516aa 100755 --- a/activemq-unit-tests/src/test/resources/log4j.properties +++ b/activemq-unit-tests/src/test/resources/log4j.properties @@ -21,6 +21,7 @@ log4j.rootLogger=INFO, out, stdout #log4j.logger.org.apache.activemq.broker.scheduler=DEBUG +#log4j.logger.org.apache.activemq.store.kahadb.scheduler=DEBUG #log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG #log4j.logger.org.apache.activemq.transport.failover=TRACE #log4j.logger.org.apache.activemq.store.jdbc=TRACE http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/db-1.log ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/db-1.log b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/db-1.log new file mode 100644 index 0000000..342f8c7 Binary files /dev/null and b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/db-1.log differ http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.data ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.data b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.data new file mode 100644 index 0000000..30c937d Binary files /dev/null and b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.data differ http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.redo ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.redo b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.redo new file mode 100644 index 0000000..b06e549 Binary files /dev/null and b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.redo differ
