Repository: activemq Updated Branches: refs/heads/activemq-5.10.x 76357bdb1 -> 3424e04fa
http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyStoreReplayer.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyStoreReplayer.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyStoreReplayer.java deleted file mode 100644 index 92563f4..0000000 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyStoreReplayer.java +++ /dev/null @@ -1,155 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadb.scheduler.legacy; - -import java.io.File; -import java.io.IOException; -import java.util.List; -import java.util.Set; - -import org.apache.activemq.store.kahadb.data.KahaAddScheduledJobCommand; -import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Used to upgrade a Legacy Job Scheduler store to the latest version this class - * loads a found legacy scheduler store and generates new add commands for all - * jobs currently in the store. - */ -public class LegacyStoreReplayer { - - static final Logger LOG = LoggerFactory.getLogger(LegacyStoreReplayer.class); - - private LegacyJobSchedulerStoreImpl store; - private final File legacyStoreDirectory; - - /** - * Creates a new Legacy Store Replayer with the given target store - * @param targetStore - * @param directory - */ - public LegacyStoreReplayer(File directory) { - this.legacyStoreDirectory = directory; - } - - /** - * Loads the legacy store and prepares it for replay into a newer Store instance. - * - * @throws IOException if an error occurs while reading in the legacy store. - */ - public void load() throws IOException { - - store = new LegacyJobSchedulerStoreImpl(); - store.setDirectory(legacyStoreDirectory); - store.setFailIfDatabaseIsLocked(true); - - try { - store.start(); - } catch (IOException ioe) { - LOG.warn("Legacy store load failed: ", ioe); - throw ioe; - } catch (Exception e) { - LOG.warn("Legacy store load failed: ", e); - throw new IOException(e); - } - } - - /** - * Unloads a previously loaded legacy store to release any resources associated with it. - * - * Once a store is unloaded it cannot be replayed again until it has been reloaded. - * @throws IOException - */ - public void unload() throws IOException { - - if (store != null) { - try { - store.stop(); - } catch (Exception e) { - LOG.warn("Legacy store unload failed: ", e); - throw new IOException(e); - } finally { - store = null; - } - } - } - - /** - * Performs a replay of scheduled jobs into the target JobSchedulerStore. - * - * @param targetStore - * The JobSchedulerStore that will receive the replay events from the legacy store. - * - * @throws IOException if an error occurs during replay of the legacy store. - */ - public void startReplay(JobSchedulerStoreImpl targetStore) throws IOException { - checkLoaded(); - - if (targetStore == null) { - throw new IOException("Cannot replay to a null store"); - } - - try { - Set<String> schedulers = store.getJobSchedulerNames(); - if (!schedulers.isEmpty()) { - - for (String name : schedulers) { - LegacyJobSchedulerImpl scheduler = store.getJobScheduler(name); - LOG.info("Replay of legacy store {} starting.", name); - replayScheduler(scheduler, targetStore); - } - } - - LOG.info("Replay of legacy store complate."); - } catch (IOException ioe) { - LOG.warn("Failed during replay of legacy store: ", ioe); - throw ioe; - } catch (Exception e) { - LOG.warn("Failed during replay of legacy store: ", e); - throw new IOException(e); - } - } - - private final void replayScheduler(LegacyJobSchedulerImpl legacy, JobSchedulerStoreImpl target) throws Exception { - List<LegacyJobImpl> jobs = legacy.getAllJobs(); - - String schedulerName = legacy.getName(); - - for (LegacyJobImpl job : jobs) { - LOG.trace("Storing job from legacy store to new store: {}", job); - KahaAddScheduledJobCommand newJob = new KahaAddScheduledJobCommand(); - newJob.setScheduler(schedulerName); - newJob.setJobId(job.getJobId()); - newJob.setStartTime(job.getStartTime()); - newJob.setCronEntry(job.getCronEntry()); - newJob.setDelay(job.getDelay()); - newJob.setPeriod(job.getPeriod()); - newJob.setRepeat(job.getRepeat()); - newJob.setNextExecutionTime(job.getNextExecutionTime()); - newJob.setPayload(job.getPayload()); - - target.store(newJob); - } - } - - private final void checkLoaded() throws IOException { - if (this.store == null) { - throw new IOException("Cannot replay until legacy store is loaded."); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/proto/journal-data.proto ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/proto/journal-data.proto b/activemq-kahadb-store/src/main/proto/journal-data.proto index 01607a5..8290c4c 100644 --- a/activemq-kahadb-store/src/main/proto/journal-data.proto +++ b/activemq-kahadb-store/src/main/proto/journal-data.proto @@ -32,11 +32,6 @@ enum KahaEntryType { KAHA_PRODUCER_AUDIT_COMMAND = 8; KAHA_ACK_MESSAGE_FILE_MAP_COMMAND = 9; KAHA_UPDATE_MESSAGE_COMMAND = 10; - KAHA_ADD_SCHEDULED_JOB_COMMAND = 11; - KAHA_RESCHEDULE_JOB_COMMAND = 12; - KAHA_REMOVE_SCHEDULED_JOB_COMMAND = 13; - KAHA_REMOVE_SCHEDULED_JOBS_COMMAND = 14; - KAHA_DESTROY_SCHEDULER_COMMAND = 15; } message KahaTraceCommand { @@ -184,62 +179,6 @@ message KahaLocation { required int32 offset = 2; } -message KahaAddScheduledJobCommand { - //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaAddScheduledJobCommand>"; - //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; - //| option java_type_method = "KahaEntryType"; - - required string scheduler=1; - required string job_id=2; - required int64 start_time=3; - required string cron_entry=4; - required int64 delay=5; - required int64 period=6; - required int32 repeat=7; - required bytes payload=8; - required int64 next_execution_time=9; -} - -message KahaRescheduleJobCommand { - //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaRescheduleJobCommand>"; - //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; - //| option java_type_method = "KahaEntryType"; - - required string scheduler=1; - required string job_id=2; - required int64 execution_time=3; - required int64 next_execution_time=4; - required int32 rescheduled_count=5; -} - -message KahaRemoveScheduledJobCommand { - //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaRemoveScheduledJobCommand>"; - //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; - //| option java_type_method = "KahaEntryType"; - - required string scheduler=1; - required string job_id=2; - required int64 next_execution_time=3; -} - -message KahaRemoveScheduledJobsCommand { - //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaRemoveScheduledJobsCommand>"; - //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; - //| option java_type_method = "KahaEntryType"; - - required string scheduler=1; - required int64 start_time=2; - required int64 end_time=3; -} - -message KahaDestroySchedulerCommand { - //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaDestroySchedulerCommand>"; - //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; - //| option java_type_method = "KahaEntryType"; - - required string scheduler=1; -} - // TODO things to ponder // should we move more message fields // that are set by the sender (and rarely required by the broker http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index 01d5170..42b25c6 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -35,7 +35,6 @@ import org.apache.activemq.leveldb.util.Log import org.apache.activemq.store.PList.PListIterator import org.fusesource.hawtbuf.{UTF8Buffer, DataByteArrayOutputStream} import org.fusesource.hawtdispatch; -import org.apache.activemq.broker.scheduler.JobSchedulerStore; object LevelDBStore extends Log { val DEFAULT_DIRECTORY = new File("LevelDB"); @@ -605,10 +604,6 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P rc } - def createJobSchedulerStore():JobSchedulerStore = { - throw new UnsupportedOperationException(); - } - def removeTopicMessageStore(destination: ActiveMQTopic): Unit = { topics.remove(destination).foreach { store=> store.subscriptions.values.foreach { sub => http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala index efd55f3..bd3904f 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala @@ -25,7 +25,6 @@ import java.io.File import java.io.IOException import java.util.Set import org.apache.activemq.util.{ServiceStopper, ServiceSupport} -import org.apache.activemq.broker.scheduler.JobSchedulerStore /** */ @@ -45,10 +44,6 @@ abstract class ProxyLevelDBStore extends LockableServiceSupport with BrokerServi return proxy_target.createTopicMessageStore(destination) } - def createJobSchedulerStore():JobSchedulerStore = { - return proxy_target.createJobSchedulerStore() - } - def setDirectory(dir: File) { proxy_target.setDirectory(dir) } http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java index 641ee97..04277bf 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java @@ -39,7 +39,6 @@ public class JobSchedulerBrokerShutdownTest extends EmbeddedBrokerTestSupport { BrokerService broker = super.createBroker(); broker.setSchedulerSupport(true); - broker.setDataDirectory("target"); broker.setSchedulerDirectoryFile(schedulerDirectory); broker.getSystemUsage().getStoreUsage().setLimit(1 * 512); broker.deleteAllMessages(); http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerJmxManagementTests.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerJmxManagementTests.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerJmxManagementTests.java deleted file mode 100644 index 8adb980..0000000 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerJmxManagementTests.java +++ /dev/null @@ -1,155 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.broker.scheduler; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import java.util.List; - -import javax.jms.Connection; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.management.openmbean.TabularData; - -import org.apache.activemq.ScheduledMessage; -import org.apache.activemq.broker.jmx.JobSchedulerViewMBean; -import org.apache.activemq.util.Wait; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Tests of the JMX JobSchedulerStore management MBean. - */ -public class JobSchedulerJmxManagementTests extends JobSchedulerTestSupport { - - private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerJmxManagementTests.class); - - @Test - public void testJobSchedulerMBeanIsRegistered() throws Exception { - JobSchedulerViewMBean view = getJobSchedulerMBean(); - assertNotNull(view); - assertTrue(view.getAllJobs().isEmpty()); - } - - @Test - public void testGetNumberOfJobs() throws Exception { - JobSchedulerViewMBean view = getJobSchedulerMBean(); - assertNotNull(view); - assertTrue(view.getAllJobs().isEmpty()); - scheduleMessage(60000, -1, -1); - assertFalse(view.getAllJobs().isEmpty()); - assertEquals(1, view.getAllJobs().size()); - scheduleMessage(60000, -1, -1); - assertEquals(2, view.getAllJobs().size()); - } - - @Test - public void testRemvoeJob() throws Exception { - JobSchedulerViewMBean view = getJobSchedulerMBean(); - assertNotNull(view); - assertTrue(view.getAllJobs().isEmpty()); - scheduleMessage(60000, -1, -1); - assertFalse(view.getAllJobs().isEmpty()); - TabularData jobs = view.getAllJobs(); - assertEquals(1, jobs.size()); - for (Object key : jobs.keySet()) { - String jobId = ((List<?>)key).get(0).toString(); - LOG.info("Attempting to remove Job: {}", jobId); - view.removeJob(jobId); - } - assertTrue(view.getAllJobs().isEmpty()); - } - - @Test - public void testRemvoeJobInRange() throws Exception { - JobSchedulerViewMBean view = getJobSchedulerMBean(); - assertNotNull(view); - assertTrue(view.getAllJobs().isEmpty()); - scheduleMessage(60000, -1, -1); - assertFalse(view.getAllJobs().isEmpty()); - String now = JobSupport.getDateTime(System.currentTimeMillis()); - String later = JobSupport.getDateTime(System.currentTimeMillis() + 120 * 1000); - view.removeAllJobs(now, later); - assertTrue(view.getAllJobs().isEmpty()); - } - - @Test - public void testGetNextScheduledJob() throws Exception { - JobSchedulerViewMBean view = getJobSchedulerMBean(); - assertNotNull(view); - assertTrue(view.getAllJobs().isEmpty()); - scheduleMessage(60000, -1, -1); - assertFalse(view.getAllJobs().isEmpty()); - long before = System.currentTimeMillis() + 57 * 1000; - long toLate = System.currentTimeMillis() + 63 * 1000; - String next = view.getNextScheduleTime(); - long nextTime = JobSupport.getDataTime(next); - LOG.info("Next Scheduled Time: {}", next); - assertTrue(nextTime > before); - assertTrue(nextTime < toLate); - } - - @Test - public void testGetExecutionCount() throws Exception { - final JobSchedulerViewMBean view = getJobSchedulerMBean(); - assertNotNull(view); - assertTrue(view.getAllJobs().isEmpty()); - scheduleMessage(10000, 1000, 10); - assertFalse(view.getAllJobs().isEmpty()); - TabularData jobs = view.getAllJobs(); - assertEquals(1, jobs.size()); - String jobId = null; - for (Object key : jobs.keySet()) { - jobId = ((List<?>)key).get(0).toString(); - } - - final String fixedJobId = jobId; - LOG.info("Attempting to get execution count for Job: {}", jobId); - assertEquals(0, view.getExecutionCount(jobId)); - - assertTrue("Should execute again", Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisified() throws Exception { - return view.getExecutionCount(fixedJobId) > 0; - } - })); - } - - @Override - protected boolean isUseJmx() { - return true; - } - - protected void scheduleMessage(int time, int period, int repeat) throws Exception { - Connection connection = createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(destination); - TextMessage message = session.createTextMessage("test msg"); - message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); - message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period); - message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat); - producer.send(message); - connection.close(); - } -} http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java index c82f8ef..bc89d9e 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java @@ -16,11 +16,7 @@ */ package org.apache.activemq.broker.scheduler; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; - +import java.io.File; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -33,17 +29,18 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; +import org.apache.activemq.EmbeddedBrokerTestSupport; import org.apache.activemq.ScheduledMessage; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.IdGenerator; -import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class JobSchedulerManagementTest extends JobSchedulerTestSupport { +public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport { private static final transient Logger LOG = LoggerFactory.getLogger(JobSchedulerManagementTest.class); - @Test public void testRemoveAllScheduled() throws Exception { final int COUNT = 5; Connection connection = createConnection(); @@ -80,7 +77,6 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport { assertEquals(latch.getCount(), COUNT); } - @Test public void testRemoveAllScheduledAtTime() throws Exception { final int COUNT = 3; Connection connection = createConnection(); @@ -126,7 +122,8 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport { // Send the remove request MessageProducer producer = session.createProducer(management); Message request = session.createMessage(); - request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL); + request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, + ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL); request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME, Long.toString(start)); request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME, Long.toString(end)); producer.send(request); @@ -146,7 +143,6 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport { assertEquals(2, latch.getCount()); } - @Test public void testBrowseAllScheduled() throws Exception { final int COUNT = 10; Connection connection = createConnection(); @@ -195,8 +191,7 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport { Thread.sleep(2000); assertEquals(latch.getCount(), COUNT); - // now see if we got all the scheduled messages on the browse - // destination. + // now see if we got all the scheduled messages on the browse destination. latch.await(10, TimeUnit.SECONDS); assertEquals(browsedLatch.getCount(), 0); @@ -205,7 +200,6 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport { assertEquals(latch.getCount(), 0); } - @Test public void testBrowseWindowlScheduled() throws Exception { final int COUNT = 10; Connection connection = createConnection(); @@ -261,18 +255,15 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport { Thread.sleep(2000); assertEquals(COUNT + 2, latch.getCount()); - // now see if we got all the scheduled messages on the browse - // destination. + // now see if we got all the scheduled messages on the browse destination. latch.await(15, TimeUnit.SECONDS); assertEquals(0, browsedLatch.getCount()); - // now see if we got all the scheduled messages on the browse - // destination. + // now see if we got all the scheduled messages on the browse destination. latch.await(20, TimeUnit.SECONDS); assertEquals(0, latch.getCount()); } - @Test public void testRemoveScheduled() throws Exception { final int COUNT = 10; Connection connection = createConnection(); @@ -306,7 +297,8 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport { // Send the browse request Message request = session.createMessage(); - request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE); + request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, + ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE); request.setJMSReplyTo(browseDest); producer.send(request); @@ -315,12 +307,14 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport { Message message = browser.receive(2000); assertNotNull(message); - try { + try{ Message remove = session.createMessage(); - remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE); - remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID, message.getStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID)); + remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, + ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE); + remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID, + message.getStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID)); producer.send(remove); - } catch (Exception e) { + } catch(Exception e) { } } @@ -329,7 +323,6 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport { assertEquals(COUNT, latch.getCount()); } - @Test public void testRemoveNotScheduled() throws Exception { Connection connection = createConnection(); @@ -340,19 +333,19 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport { MessageProducer producer = session.createProducer(management); - try { + try{ // Send the remove request Message remove = session.createMessage(); - remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL); + remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, + ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL); remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID, new IdGenerator().generateId()); producer.send(remove); - } catch (Exception e) { + } catch(Exception e) { fail("Caught unexpected exception during remove of unscheduled message."); } } - @Test public void testBrowseWithSelector() throws Exception { Connection connection = createConnection(); @@ -369,7 +362,7 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport { Destination browseDest = session.createTemporaryTopic(); // Create the "Browser" - MessageConsumer browser = session.createConsumer(browseDest, ScheduledMessage.AMQ_SCHEDULED_DELAY + " = 45000"); + MessageConsumer browser = session.createConsumer(browseDest, ScheduledMessage.AMQ_SCHEDULED_DELAY + " = 45000" ); connection.start(); @@ -390,6 +383,7 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport { assertNull(message); } + protected void scheduleMessage(Connection connection, long delay) throws Exception { scheduleMessage(connection, delay, 1); } @@ -400,10 +394,38 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport { TextMessage message = session.createTextMessage("test msg"); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); - for (int i = 0; i < count; ++i) { + for(int i = 0; i < count; ++i ) { producer.send(message); } producer.close(); } + + @Override + protected void setUp() throws Exception { + bindAddress = "vm://localhost"; + super.setUp(); + } + + @Override + protected BrokerService createBroker() throws Exception { + return createBroker(true); + } + + protected BrokerService createBroker(boolean delete) throws Exception { + File schedulerDirectory = new File("target/scheduler"); + if (delete) { + IOHelper.mkdirs(schedulerDirectory); + IOHelper.deleteChildren(schedulerDirectory); + } + BrokerService answer = new BrokerService(); + answer.setPersistent(true); + answer.setDeleteAllMessagesOnStartup(true); + answer.setDataDirectory("target"); + answer.setSchedulerDirectoryFile(schedulerDirectory); + answer.setSchedulerSupport(true); + answer.setUseJmx(false); + answer.addConnector(bindAddress); + return answer; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java deleted file mode 100644 index c013a4c..0000000 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java +++ /dev/null @@ -1,125 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.broker.scheduler; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.IOException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; -import org.apache.activemq.util.ByteSequence; -import org.apache.activemq.util.IOHelper; -import org.apache.activemq.util.Wait; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class JobSchedulerStoreCheckpointTest { - - static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreCheckpointTest.class); - - private JobSchedulerStoreImpl store; - private JobScheduler scheduler; - private ByteSequence payload; - - @Before - public void setUp() throws Exception { - File directory = new File("target/test/ScheduledJobsDB"); - IOHelper.mkdirs(directory); - IOHelper.deleteChildren(directory); - startStore(directory); - - byte[] data = new byte[8192]; - for (int i = 0; i < data.length; ++i) { - data[i] = (byte) (i % 256); - } - - payload = new ByteSequence(data); - } - - protected void startStore(File directory) throws Exception { - store = new JobSchedulerStoreImpl(); - store.setDirectory(directory); - store.setCheckpointInterval(5000); - store.setCleanupInterval(10000); - store.setJournalMaxFileLength(10 * 1024); - store.start(); - scheduler = store.getJobScheduler("test"); - scheduler.startDispatching(); - } - - private int getNumJournalFiles() throws IOException { - return store.getJournal().getFileMap().size(); - } - - @After - public void tearDown() throws Exception { - scheduler.stopDispatching(); - store.stop(); - } - - @Test - public void test() throws Exception { - final int COUNT = 10; - final CountDownLatch latch = new CountDownLatch(COUNT); - scheduler.addListener(new JobListener() { - @Override - public void scheduledJob(String id, ByteSequence job) { - latch.countDown(); - } - }); - - long time = TimeUnit.SECONDS.toMillis(30); - for (int i = 0; i < COUNT; i++) { - scheduler.schedule("id" + i, payload, "", time, 0, 0); - } - - int size = scheduler.getAllJobs().size(); - assertEquals(size, COUNT); - - LOG.info("Number of journal log files: {}", getNumJournalFiles()); - // need a little slack so go over 60 seconds - assertTrue(latch.await(70, TimeUnit.SECONDS)); - assertEquals(0, latch.getCount()); - - for (int i = 0; i < COUNT; i++) { - scheduler.schedule("id" + i, payload, "", time, 0, 0); - } - - LOG.info("Number of journal log files: {}", getNumJournalFiles()); - // need a little slack so go over 60 seconds - assertTrue(latch.await(70, TimeUnit.SECONDS)); - assertEquals(0, latch.getCount()); - - assertTrue("Should be only one log left: " + getNumJournalFiles(), Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisified() throws Exception { - return getNumJournalFiles() == 1; - } - }, TimeUnit.MINUTES.toMillis(2))); - - LOG.info("Number of journal log files: {}", getNumJournalFiles()); - } -} http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java index df1e7ff..0e0c1d7 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java @@ -16,62 +16,50 @@ */ package org.apache.activemq.broker.scheduler; -import static org.junit.Assert.assertEquals; - import java.io.File; import java.util.ArrayList; import java.util.List; +import junit.framework.TestCase; + import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.IOHelper; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class JobSchedulerStoreTest { - private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreTest.class); +public class JobSchedulerStoreTest extends TestCase { - @Test(timeout = 120 * 1000) public void testRestart() throws Exception { JobSchedulerStore store = new JobSchedulerStoreImpl(); File directory = new File("target/test/ScheduledDB"); - IOHelper.mkdirs(directory); - IOHelper.deleteChildren(directory); - store.setDirectory(directory); + IOHelper.mkdirs(directory); + IOHelper.deleteChildren(directory); + store.setDirectory(directory); final int NUMBER = 1000; store.start(); - List<ByteSequence> list = new ArrayList<ByteSequence>(); - for (int i = 0; i < NUMBER; i++) { - ByteSequence buff = new ByteSequence(new String("testjob" + i).getBytes()); + List<ByteSequence>list = new ArrayList<ByteSequence>(); + for (int i = 0; i < NUMBER;i++ ) { + ByteSequence buff = new ByteSequence(new String("testjob"+i).getBytes()); list.add(buff); } - JobScheduler js = store.getJobScheduler("test"); js.startDispatching(); int count = 0; - long startTime = 10 * 60 * 1000; - long period = startTime; - for (ByteSequence job : list) { - js.schedule("id:" + (count++), job, "", startTime, period, -1); + long startTime = 10 * 60 * 1000; long period = startTime; + for (ByteSequence job:list) { + js.schedule("id:"+(count++), job, "", startTime, period, -1); } - - List<Job> test = js.getAllJobs(); - LOG.debug("Found {} jobs in the store before restart", test.size()); - assertEquals(list.size(), test.size()); + List<Job>test = js.getAllJobs(); + assertEquals(list.size(),test.size()); store.stop(); store.start(); js = store.getJobScheduler("test"); test = js.getAllJobs(); - LOG.debug("Found {} jobs in the store after restart", test.size()); - assertEquals(list.size(), test.size()); - - for (int i = 0; i < list.size(); i++) { + assertEquals(list.size(),test.size()); + for (int i = 0; i < list.size();i++) { String orig = new String(list.get(i).getData()); String payload = new String(test.get(i).getPayload()); - assertEquals(orig, payload); + assertEquals(orig,payload); } } } http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java index 2210eba..5126970 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java @@ -31,13 +31,8 @@ import org.apache.activemq.util.IOHelper; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class JobSchedulerTest { - - private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerTest.class); - private JobSchedulerStore store; private JobScheduler scheduler; @@ -178,37 +173,6 @@ public class JobSchedulerTest { } @Test - public void testGetExecutionCount() throws Exception { - final String jobId = "Job-1"; - long time = 10000; - final CountDownLatch done = new CountDownLatch(10); - - String str = new String("test"); - scheduler.schedule(jobId, new ByteSequence(str.getBytes()), "", time, 1000, 10); - - int size = scheduler.getAllJobs().size(); - assertEquals(size, 1); - - scheduler.addListener(new JobListener() { - @Override - public void scheduledJob(String id, ByteSequence job) { - LOG.info("Job exectued: {}", 11 - done.getCount()); - done.countDown(); - } - }); - - List<Job> jobs = scheduler.getNextScheduleJobs(); - assertEquals(1, jobs.size()); - Job job = jobs.get(0); - assertEquals(jobId, job.getJobId()); - assertEquals(0, job.getExecutionCount()); - assertTrue("Should have fired ten times.", done.await(60, TimeUnit.SECONDS)); - // The job is not updated on the last firing as it is removed from the store following - // it's last execution so the count will always be one less than the max firings. - assertTrue(job.getExecutionCount() >= 9); - } - - @Test public void testgetAllJobs() throws Exception { final int COUNT = 10; final String ID = "id:"; http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java deleted file mode 100644 index 2b25797..0000000 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java +++ /dev/null @@ -1,112 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.broker.scheduler; - -import java.io.File; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Queue; -import javax.management.ObjectName; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.JobSchedulerViewMBean; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.util.IOHelper; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.rules.TestName; - -/** - * Base class for tests of the Broker's JobSchedulerStore. - */ -public class JobSchedulerTestSupport { - - @Rule public TestName name = new TestName(); - - protected String connectionUri; - protected BrokerService broker; - protected JobScheduler jobScheduler; - protected Queue destination; - - @Before - public void setUp() throws Exception { - connectionUri = "vm://localhost"; - destination = new ActiveMQQueue(name.getMethodName()); - - broker = createBroker(); - broker.start(); - broker.waitUntilStarted(); - - jobScheduler = broker.getJobSchedulerStore().getJobScheduler("JMS"); - } - - @After - public void tearDown() throws Exception { - if (broker != null) { - broker.stop(); - broker.waitUntilStopped(); - } - } - - protected Connection createConnection() throws Exception { - return createConnectionFactory().createConnection(); - } - - protected ConnectionFactory createConnectionFactory() throws Exception { - return new ActiveMQConnectionFactory(connectionUri); - } - - protected BrokerService createBroker() throws Exception { - return createBroker(true); - } - - protected boolean isUseJmx() { - return false; - } - - protected JobSchedulerViewMBean getJobSchedulerMBean() throws Exception { - ObjectName objectName = broker.getAdminView().getJMSJobScheduler(); - JobSchedulerViewMBean scheduler = null; - if (objectName != null) { - scheduler = (JobSchedulerViewMBean) broker.getManagementContext() - .newProxyInstance(objectName, JobSchedulerViewMBean.class, true); - } - - return scheduler; - } - - protected BrokerService createBroker(boolean delete) throws Exception { - File schedulerDirectory = new File("target/scheduler"); - if (delete) { - IOHelper.mkdirs(schedulerDirectory); - IOHelper.deleteChildren(schedulerDirectory); - } - - BrokerService answer = new BrokerService(); - answer.setPersistent(true); - answer.setDeleteAllMessagesOnStartup(true); - answer.setDataDirectory("target"); - answer.setSchedulerDirectoryFile(schedulerDirectory); - answer.setSchedulerSupport(true); - answer.setUseJmx(isUseJmx()); - return answer; - } -} http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerIndexRebuildTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerIndexRebuildTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerIndexRebuildTest.java deleted file mode 100644 index 9d0fef2..0000000 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerIndexRebuildTest.java +++ /dev/null @@ -1,179 +0,0 @@ -package org.apache.activemq.broker.scheduler; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.IOException; -import java.security.ProtectionDomain; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.ScheduledMessage; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; -import org.apache.activemq.util.IOHelper; -import org.apache.activemq.util.Wait; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class KahaDBSchedulerIndexRebuildTest { - - static final Logger LOG = LoggerFactory.getLogger(KahaDBSchedulerIndexRebuildTest.class); - - private BrokerService broker = null; - private final int NUM_JOBS = 50; - - static String basedir; - static { - try { - ProtectionDomain protectionDomain = SchedulerDBVersionTest.class.getProtectionDomain(); - basedir = new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../.").getCanonicalPath(); - } catch (IOException e) { - basedir = "."; - } - } - - private final File schedulerStoreDir = new File(basedir, "activemq-data/store/scheduler"); - private final File storeDir = new File(basedir, "activemq-data/store/"); - - @Before - public void setUp() throws Exception { - LOG.info("Test Dir = {}", schedulerStoreDir); - } - - @After - public void tearDown() throws Exception { - if (broker != null) { - broker.stop(); - } - } - - @Test - public void testIndexRebuilds() throws Exception { - IOHelper.deleteFile(schedulerStoreDir); - - JobSchedulerStoreImpl schedulerStore = createScheduler(); - broker = createBroker(schedulerStore); - broker.start(); - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); - Connection connection = cf.createConnection(); - connection.start(); - for (int i = 0; i < NUM_JOBS; ++i) { - scheduleRepeating(connection); - } - connection.close(); - - JobScheduler scheduler = schedulerStore.getJobScheduler("JMS"); - assertNotNull(scheduler); - assertEquals(NUM_JOBS, scheduler.getAllJobs().size()); - - broker.stop(); - - IOHelper.delete(new File(schedulerStoreDir, "scheduleDB.data")); - - schedulerStore = createScheduler(); - broker = createBroker(schedulerStore); - broker.start(); - - scheduler = schedulerStore.getJobScheduler("JMS"); - assertNotNull(scheduler); - assertEquals(NUM_JOBS, scheduler.getAllJobs().size()); - } - - @Test - public void testIndexRebuildsAfterSomeJobsExpire() throws Exception { - IOHelper.deleteFile(schedulerStoreDir); - - JobSchedulerStoreImpl schedulerStore = createScheduler(); - broker = createBroker(schedulerStore); - broker.start(); - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); - Connection connection = cf.createConnection(); - connection.start(); - for (int i = 0; i < NUM_JOBS; ++i) { - scheduleRepeating(connection); - scheduleOneShot(connection); - } - connection.close(); - - JobScheduler scheduler = schedulerStore.getJobScheduler("JMS"); - assertNotNull(scheduler); - assertEquals(NUM_JOBS * 2, scheduler.getAllJobs().size()); - - final JobScheduler awaitingOneShotTimeout = scheduler; - assertTrue("One shot jobs should time out", Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisified() throws Exception { - return awaitingOneShotTimeout.getAllJobs().size() == NUM_JOBS; - } - }, TimeUnit.MINUTES.toMillis(2))); - - broker.stop(); - - IOHelper.delete(new File(schedulerStoreDir, "scheduleDB.data")); - - schedulerStore = createScheduler(); - broker = createBroker(schedulerStore); - broker.start(); - - scheduler = schedulerStore.getJobScheduler("JMS"); - assertNotNull(scheduler); - assertEquals(NUM_JOBS, scheduler.getAllJobs().size()); - } - - private void scheduleRepeating(Connection connection) throws Exception { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue("test.queue"); - MessageProducer producer = session.createProducer(queue); - - TextMessage message = session.createTextMessage("test msg"); - long time = 360 * 1000; - message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); - message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 500); - message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, -1); - producer.send(message); - producer.close(); - } - - private void scheduleOneShot(Connection connection) throws Exception { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue("test.queue"); - MessageProducer producer = session.createProducer(queue); - - TextMessage message = session.createTextMessage("test msg"); - long time = TimeUnit.SECONDS.toMillis(30); - message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); - message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 0); - producer.send(message); - producer.close(); - } - - protected JobSchedulerStoreImpl createScheduler() { - JobSchedulerStoreImpl scheduler = new JobSchedulerStoreImpl(); - scheduler.setDirectory(schedulerStoreDir); - scheduler.setJournalMaxFileLength(10 * 1024); - return scheduler; - } - - protected BrokerService createBroker(JobSchedulerStoreImpl scheduler) throws Exception { - BrokerService answer = new BrokerService(); - answer.setJobSchedulerStore(scheduler); - answer.setPersistent(true); - answer.setDataDirectory(storeDir.getAbsolutePath()); - answer.setSchedulerSupport(true); - answer.setUseJmx(false); - return answer; - } -} http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerMissingJournalLogsTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerMissingJournalLogsTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerMissingJournalLogsTest.java deleted file mode 100644 index 30da10d..0000000 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerMissingJournalLogsTest.java +++ /dev/null @@ -1,204 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.broker.scheduler; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.File; -import java.io.IOException; -import java.security.ProtectionDomain; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import javax.jms.Connection; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.ScheduledMessage; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.store.kahadb.disk.journal.DataFile; -import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; -import org.apache.activemq.util.IOHelper; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - *Test that the store recovers even if some log files are missing. - */ -public class KahaDBSchedulerMissingJournalLogsTest { - - static final Logger LOG = LoggerFactory.getLogger(KahaDBSchedulerIndexRebuildTest.class); - - private BrokerService broker = null; - private JobSchedulerStoreImpl schedulerStore = null; - - private final int NUM_LOGS = 6; - - static String basedir; - static { - try { - ProtectionDomain protectionDomain = SchedulerDBVersionTest.class.getProtectionDomain(); - basedir = new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../.").getCanonicalPath(); - } catch (IOException e) { - basedir = "."; - } - } - - private final File schedulerStoreDir = new File(basedir, "activemq-data/store/scheduler"); - private final File storeDir = new File(basedir, "activemq-data/store/"); - - /** - * @throws java.lang.Exception - */ - @Before - public void setUp() throws Exception { - IOHelper.deleteFile(schedulerStoreDir); - LOG.info("Test Dir = {}", schedulerStoreDir); - - createBroker(); - broker.start(); - broker.waitUntilStarted(); - - schedulerStore = (JobSchedulerStoreImpl) broker.getJobSchedulerStore(); - } - - /** - * @throws java.lang.Exception - */ - @After - public void tearDown() throws Exception { - if (broker != null) { - broker.stop(); - broker.waitUntilStopped(); - } - } - - @Test(timeout=120 * 1000) - public void testMissingLogsCausesBrokerToFail() throws Exception { - fillUpSomeLogFiles(); - - int jobCount = schedulerStore.getJobScheduler("JMS").getAllJobs().size(); - LOG.info("There are {} jobs in the store.", jobCount); - - List<File> toDelete = new ArrayList<File>(); - Map<Integer, DataFile> files = schedulerStore.getJournal().getFileMap(); - for (int i = files.size(); i > files.size() / 2; i--) { - toDelete.add(files.get(i).getFile()); - } - - broker.stop(); - broker.waitUntilStopped(); - - for (File file : toDelete) { - LOG.info("File to delete: {}", file); - IOHelper.delete(file); - } - - try { - createBroker(); - fail("Should not start when logs are missing."); - } catch (Exception e) { - } - } - - @Test(timeout=120 * 1000) - public void testRecoverWhenSomeLogsAreMissing() throws Exception { - fillUpSomeLogFiles(); - - int jobCount = schedulerStore.getJobScheduler("JMS").getAllJobs().size(); - LOG.info("There are {} jobs in the store.", jobCount); - - List<File> toDelete = new ArrayList<File>(); - Map<Integer, DataFile> files = schedulerStore.getJournal().getFileMap(); - for (int i = files.size() - 1; i > files.size() / 2; i--) { - toDelete.add(files.get(i).getFile()); - } - - broker.stop(); - broker.waitUntilStopped(); - - for (File file : toDelete) { - LOG.info("File to delete: {}", file); - IOHelper.delete(file); - } - - schedulerStore = createScheduler(); - schedulerStore.setIgnoreMissingJournalfiles(true); - - createBroker(schedulerStore); - broker.start(); - broker.waitUntilStarted(); - - int postRecoverJobCount = schedulerStore.getJobScheduler("JMS").getAllJobs().size(); - assertTrue(postRecoverJobCount > 0); - assertTrue(postRecoverJobCount < jobCount); - } - - private void fillUpSomeLogFiles() throws Exception { - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); - Connection connection = cf.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue("test.queue"); - MessageProducer producer = session.createProducer(queue); - connection.start(); - while (true) { - scheduleRepeating(session, producer); - if (schedulerStore.getJournal().getFileMap().size() == NUM_LOGS) { - break; - } - } - connection.close(); - } - - private void scheduleRepeating(Session session, MessageProducer producer) throws Exception { - TextMessage message = session.createTextMessage("test msg"); - long time = 360 * 1000; - message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); - message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 500); - message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, -1); - producer.send(message); - } - - protected JobSchedulerStoreImpl createScheduler() { - JobSchedulerStoreImpl scheduler = new JobSchedulerStoreImpl(); - scheduler.setDirectory(schedulerStoreDir); - scheduler.setJournalMaxFileLength(10 * 1024); - return scheduler; - } - - protected void createBroker() throws Exception { - createBroker(createScheduler()); - } - - protected void createBroker(JobSchedulerStoreImpl scheduler) throws Exception { - broker = new BrokerService(); - broker.setJobSchedulerStore(scheduler); - broker.setPersistent(true); - broker.setDataDirectory(storeDir.getAbsolutePath()); - broker.setSchedulerSupport(true); - broker.setUseJmx(false); - } -} http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/SchedulerDBVersionTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/SchedulerDBVersionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/SchedulerDBVersionTest.java deleted file mode 100644 index 721f417..0000000 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/SchedulerDBVersionTest.java +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.scheduler; - -import static org.junit.Assert.assertEquals; - -import java.io.File; -import java.io.IOException; -import java.security.ProtectionDomain; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.ScheduledMessage; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; -import org.apache.activemq.util.IOHelper; -import org.junit.After; -import org.junit.Ignore; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SchedulerDBVersionTest { - static String basedir; - static { - try { - ProtectionDomain protectionDomain = SchedulerDBVersionTest.class.getProtectionDomain(); - basedir = new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalPath(); - } catch (IOException e) { - basedir = "."; - } - } - - static final Logger LOG = LoggerFactory.getLogger(SchedulerDBVersionTest.class); - final static File VERSION_LEGACY_JMS = - new File(basedir + "/src/test/resources/org/apache/activemq/store/schedulerDB/legacy"); - - BrokerService broker = null; - - protected BrokerService createBroker(JobSchedulerStoreImpl scheduler) throws Exception { - BrokerService answer = new BrokerService(); - answer.setJobSchedulerStore(scheduler); - answer.setPersistent(true); - answer.setDataDirectory("target"); - answer.setSchedulerSupport(true); - answer.setUseJmx(false); - return answer; - } - - @After - public void tearDown() throws Exception { - if (broker != null) { - broker.stop(); - } - } - - @Ignore("Used only when a new version of the store needs to archive it's test data.") - @Test - public void testCreateStore() throws Exception { - JobSchedulerStoreImpl scheduler = new JobSchedulerStoreImpl(); - File dir = new File("src/test/resources/org/apache/activemq/store/schedulerDB/legacy"); - IOHelper.deleteFile(dir); - scheduler.setDirectory(dir); - scheduler.setJournalMaxFileLength(1024 * 1024); - broker = createBroker(scheduler); - broker.start(); - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); - Connection connection = cf.createConnection(); - connection.start(); - scheduleRepeating(connection); - connection.close(); - broker.stop(); - } - - private void scheduleRepeating(Connection connection) throws Exception { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue("test.queue"); - MessageProducer producer = session.createProducer(queue); - - TextMessage message = session.createTextMessage("test msg"); - long time = 1000; - message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); - message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 500); - message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, -1); - producer.send(message); - producer.close(); - } - - @Test - public void testLegacyStoreConversion() throws Exception { - doTestScheduleRepeated(VERSION_LEGACY_JMS); - } - - public void doTestScheduleRepeated(File existingStore) throws Exception { - File testDir = new File("target/activemq-data/store/scheduler/versionDB"); - IOHelper.deleteFile(testDir); - IOHelper.copyFile(existingStore, testDir); - - final int NUMBER = 10; - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); - - for (int i = 0; i < 3; ++i) { - JobSchedulerStoreImpl scheduler = new JobSchedulerStoreImpl(); - scheduler.setDirectory(testDir); - scheduler.setJournalMaxFileLength(1024 * 1024); - BrokerService broker = createBroker(scheduler); - broker.start(); - broker.waitUntilStarted(); - - final AtomicInteger count = new AtomicInteger(); - Connection connection = cf.createConnection(); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue("test.queue"); - - MessageConsumer consumer = session.createConsumer(queue); - - final CountDownLatch latch = new CountDownLatch(NUMBER); - consumer.setMessageListener(new MessageListener() { - @Override - public void onMessage(Message message) { - LOG.info("Received scheduled message: {}", message); - latch.countDown(); - count.incrementAndGet(); - } - }); - - connection.start(); - assertEquals(latch.getCount(), NUMBER); - latch.await(30, TimeUnit.SECONDS); - - connection.close(); - broker.stop(); - broker.waitUntilStopped(); - - assertEquals(0, latch.getCount()); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-unit-tests/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/resources/log4j.properties b/activemq-unit-tests/src/test/resources/log4j.properties index 85516aa..564ed9e 100755 --- a/activemq-unit-tests/src/test/resources/log4j.properties +++ b/activemq-unit-tests/src/test/resources/log4j.properties @@ -21,7 +21,6 @@ log4j.rootLogger=INFO, out, stdout #log4j.logger.org.apache.activemq.broker.scheduler=DEBUG -#log4j.logger.org.apache.activemq.store.kahadb.scheduler=DEBUG #log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG #log4j.logger.org.apache.activemq.transport.failover=TRACE #log4j.logger.org.apache.activemq.store.jdbc=TRACE http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/db-1.log ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/db-1.log b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/db-1.log deleted file mode 100644 index 342f8c7..0000000 Binary files a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/db-1.log and /dev/null differ http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.data ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.data b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.data deleted file mode 100644 index 30c937d..0000000 Binary files a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.data and /dev/null differ http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.redo ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.redo b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.redo deleted file mode 100644 index b06e549..0000000 Binary files a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.redo and /dev/null differ
