https://issues.apache.org/jira/browse/AMQ-3758
Refactor the scheduler store into a more KahaDB style store that can
recover from various problems like missing journal files or corruption
as well as rebuild its index when needed. Move the scheduler store into
a more configurable style that allows for users to plug in their own
implementations. Store update from legacy versions is automatic.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fc244f48
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fc244f48
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fc244f48
Branch: refs/heads/activemq-5.10.x
Commit: fc244f48e48596c668a7d9dc3b84c26e60693823
Parents: db669e4
Author: Timothy Bish <[email protected]>
Authored: Mon Jul 7 12:28:11 2014 -0400
Committer: Hadrian Zbarcea <[email protected]>
Committed: Tue Dec 16 16:11:09 2014 -0500
----------------------------------------------------------------------
.../apache/activemq/broker/BrokerService.java | 25 +-
.../activemq/broker/jmx/JobSchedulerView.java | 56 +-
.../broker/jmx/JobSchedulerViewMBean.java | 113 +-
.../apache/activemq/broker/scheduler/Job.java | 23 +-
.../activemq/broker/scheduler/JobListener.java | 16 +-
.../activemq/broker/scheduler/JobScheduler.java | 33 +-
.../broker/scheduler/JobSchedulerFacade.java | 6 +
.../broker/scheduler/JobSchedulerStore.java | 43 +
.../activemq/broker/scheduler/JobSupport.java | 5 +-
.../activemq/store/PersistenceAdapter.java | 119 +-
.../store/memory/MemoryPersistenceAdapter.java | 36 +-
.../java/org/apache/activemq/util/IOHelper.java | 68 +-
.../store/jdbc/JDBCPersistenceAdapter.java | 7 +
.../journal/JournalPersistenceAdapter.java | 71 +-
.../store/kahadb/AbstractKahaDBMetaData.java | 57 +
.../store/kahadb/AbstractKahaDBStore.java | 745 ++++++++++++
.../activemq/store/kahadb/KahaDBMetaData.java | 135 +++
.../store/kahadb/KahaDBPersistenceAdapter.java | 15 +-
.../activemq/store/kahadb/KahaDBStore.java | 55 +-
.../kahadb/MultiKahaDBPersistenceAdapter.java | 56 +-
.../kahadb/MultiKahaDBTransactionStore.java | 18 +-
.../activemq/store/kahadb/TempKahaDBStore.java | 138 ++-
.../apache/activemq/store/kahadb/Visitor.java | 20 +
.../store/kahadb/scheduler/JobImpl.java | 21 +-
.../store/kahadb/scheduler/JobLocation.java | 77 +-
.../scheduler/JobLocationsMarshaller.java | 53 +
.../kahadb/scheduler/JobSchedulerImpl.java | 837 ++++++++------
.../scheduler/JobSchedulerKahaDBMetaData.java | 246 ++++
.../kahadb/scheduler/JobSchedulerStoreImpl.java | 1076 +++++++++++++-----
.../scheduler/UnknownStoreVersionException.java | 24 +
.../kahadb/scheduler/legacy/LegacyJobImpl.java | 72 ++
.../scheduler/legacy/LegacyJobLocation.java | 296 +++++
.../legacy/LegacyJobSchedulerImpl.java | 222 ++++
.../legacy/LegacyJobSchedulerStoreImpl.java | 378 ++++++
.../scheduler/legacy/LegacyStoreReplayer.java | 155 +++
.../src/main/proto/journal-data.proto | 61 +
.../apache/activemq/leveldb/LevelDBStore.scala | 5 +
.../leveldb/replicated/ProxyLevelDBStore.scala | 5 +
.../JobSchedulerBrokerShutdownTest.java | 1 +
.../JobSchedulerJmxManagementTests.java | 155 +++
.../scheduler/JobSchedulerManagementTest.java | 84 +-
.../JobSchedulerStoreCheckpointTest.java | 125 ++
.../broker/scheduler/JobSchedulerStoreTest.java | 46 +-
.../broker/scheduler/JobSchedulerTest.java | 36 +
.../scheduler/JobSchedulerTestSupport.java | 112 ++
.../KahaDBSchedulerIndexRebuildTest.java | 179 +++
.../KahaDBSchedulerMissingJournalLogsTest.java | 204 ++++
.../scheduler/SchedulerDBVersionTest.java | 164 +++
.../src/test/resources/log4j.properties | 1 +
.../activemq/store/schedulerDB/legacy/db-1.log | Bin 0 -> 524288 bytes
.../store/schedulerDB/legacy/scheduleDB.data | Bin 0 -> 20480 bytes
.../store/schedulerDB/legacy/scheduleDB.redo | Bin 0 -> 16408 bytes
52 files changed, 5584 insertions(+), 911 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
----------------------------------------------------------------------
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index 00d4abd..5becec2 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -1861,6 +1861,23 @@ public class BrokerService implements Service {
try {
PersistenceAdapter pa = getPersistenceAdapter();
+ if (pa != null) {
+ this.jobSchedulerStore = pa.createJobSchedulerStore();
+
jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
+ configureService(jobSchedulerStore);
+ jobSchedulerStore.start();
+ return this.jobSchedulerStore;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (UnsupportedOperationException ex) {
+ // It's ok if the store doesn't implement a scheduler.
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ try {
+ PersistenceAdapter pa = getPersistenceAdapter();
if (pa != null && pa instanceof JobSchedulerStore) {
this.jobSchedulerStore = (JobSchedulerStore) pa;
configureService(jobSchedulerStore);
@@ -1870,9 +1887,13 @@ public class BrokerService implements Service {
throw new RuntimeException(e);
}
+ // Load the KahaDB store as a last resort, this only works if KahaDB is
+ // included at runtime, otherwise this will fail. User should
disable
+ // scheduler support if this fails.
try {
- String clazz =
"org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl";
- jobSchedulerStore = (JobSchedulerStore)
getClass().getClassLoader().loadClass(clazz).newInstance();
+ String clazz =
"org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter";
+ PersistenceAdapter adaptor =
(PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance();
+ jobSchedulerStore = adaptor.createJobSchedulerStore();
jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
configureService(jobSchedulerStore);
jobSchedulerStore.start();
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
----------------------------------------------------------------------
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
index 9e5a1fb..2118a96 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
@@ -16,23 +16,39 @@
*/
package org.apache.activemq.broker.jmx;
+import java.util.List;
+
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.scheduler.Job;
import org.apache.activemq.broker.scheduler.JobScheduler;
import org.apache.activemq.broker.scheduler.JobSupport;
-import javax.management.openmbean.*;
-import java.io.IOException;
-import java.util.List;
-
+/**
+ * MBean object that can be used to manage a single instance of a
JobScheduler. The object
+ * provides methods for querying for jobs and removing some or all of the jobs
that are
+ * scheduled in the managed store.
+ */
public class JobSchedulerView implements JobSchedulerViewMBean {
private final JobScheduler jobScheduler;
+ /**
+ * Creates a new instance of the JobScheduler management MBean.
+ *
+ * @param jobScheduler
+ * The scheduler instance to manage.
+ */
public JobSchedulerView(JobScheduler jobScheduler) {
this.jobScheduler = jobScheduler;
}
+ @Override
public TabularData getAllJobs() throws Exception {
OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
CompositeType ct = factory.getCompositeType();
@@ -45,6 +61,7 @@ public class JobSchedulerView implements
JobSchedulerViewMBean {
return rc;
}
+ @Override
public TabularData getAllJobs(String startTime, String finishTime) throws
Exception {
OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
CompositeType ct = factory.getCompositeType();
@@ -59,6 +76,7 @@ public class JobSchedulerView implements
JobSchedulerViewMBean {
return rc;
}
+ @Override
public TabularData getNextScheduleJobs() throws Exception {
OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
CompositeType ct = factory.getCompositeType();
@@ -71,31 +89,51 @@ public class JobSchedulerView implements
JobSchedulerViewMBean {
return rc;
}
+ @Override
public String getNextScheduleTime() throws Exception {
long time = this.jobScheduler.getNextScheduleTime();
return JobSupport.getDateTime(time);
}
+ @Override
public void removeAllJobs() throws Exception {
this.jobScheduler.removeAllJobs();
-
}
+ @Override
public void removeAllJobs(String startTime, String finishTime) throws
Exception {
long start = JobSupport.getDataTime(startTime);
long finish = JobSupport.getDataTime(finishTime);
this.jobScheduler.removeAllJobs(start, finish);
+ }
+ @Override
+ public void removeAllJobsAtScheduledTime(String time) throws Exception {
+ long removeAtTime = JobSupport.getDataTime(time);
+ this.jobScheduler.remove(removeAtTime);
}
+ @Override
+ public void removeJobAtScheduledTime(String time) throws Exception {
+ removeAllJobsAtScheduledTime(time);
+ }
+
+ @Override
public void removeJob(String jobId) throws Exception {
this.jobScheduler.remove(jobId);
-
}
- public void removeJobAtScheduledTime(String time) throws IOException {
- // TODO Auto-generated method stub
+ @Override
+ public int getExecutionCount(String jobId) throws Exception {
+ int result = 0;
- }
+ List<Job> jobs = this.jobScheduler.getAllJobs();
+ for (Job job : jobs) {
+ if (job.getJobId().equals(jobId)) {
+ result = job.getExecutionCount();
+ }
+ }
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
----------------------------------------------------------------------
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
index f5745ea..76a7926 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
@@ -18,76 +18,125 @@ package org.apache.activemq.broker.jmx;
import javax.management.openmbean.TabularData;
-
-
public interface JobSchedulerViewMBean {
+
/**
- * remove all jobs scheduled to run at this time
+ * Remove all jobs scheduled to run at this time. If there are no jobs
scheduled
+ * at the given time this methods returns without making any modifications
to the
+ * scheduler store.
+ *
* @param time
- * @throws Exception
+ * the string formated time that should be used to remove jobs.
+ *
+ * @throws Exception if an error occurs while performing the remove.
+ *
+ * @deprecated use removeAllJobsAtScheduledTime instead as it is more
explicit about what
+ * the method is actually doing.
*/
+ @Deprecated
@MBeanInfo("remove jobs with matching execution time")
public abstract void removeJobAtScheduledTime(@MBeanInfo("time: yyyy-MM-dd
hh:mm:ss")String time) throws Exception;
/**
- * remove a job with the matching jobId
+ * Remove all jobs scheduled to run at this time. If there are no jobs
scheduled
+ * at the given time this methods returns without making any modifications
to the
+ * scheduler store.
+ *
+ * @param time
+ * the string formated time that should be used to remove jobs.
+ *
+ * @throws Exception if an error occurs while performing the remove.
+ */
+ @MBeanInfo("remove jobs with matching execution time")
+ public abstract void removeAllJobsAtScheduledTime(@MBeanInfo("time: yyyy-MM-dd
hh:mm:ss")String time) throws Exception;
+
+ /**
+ * Remove a job with the matching jobId. If the method does not find a
matching job
+ * then it returns without throwing an error or making any modifications
to the job
+ * scheduler store.
+ *
* @param jobId
- * @throws Exception
+ * the Job Id to remove from the scheduler store.
+ *
+ * @throws Exception if an error occurs while attempting to remove the Job.
*/
@MBeanInfo("remove jobs with matching jobId")
public abstract void removeJob(@MBeanInfo("jobId")String jobId) throws
Exception;
-
+
/**
- * remove all the Jobs from the scheduler
- * @throws Exception
+ * Remove all the Jobs from the scheduler,
+ *
+ * @throws Exception if an error occurs while purging the store.
*/
@MBeanInfo("remove all scheduled jobs")
public abstract void removeAllJobs() throws Exception;
-
+
/**
- * remove all the Jobs from the scheduler that are due between the start
and finish times
- * @param start time
- * @param finish time
- * @throws Exception
+ * Remove all the Jobs from the scheduler that are due between the start
and finish times.
+ *
+ * @param start
+ * the starting time to remove jobs from.
+ * @param finish
+ * the finish time for the remove operation.
+ *
+ * @throws Exception if an error occurs while attempting to remove the
jobs.
*/
@MBeanInfo("remove all scheduled jobs between time ranges ")
public abstract void removeAllJobs(@MBeanInfo("start: yyyy-MM-dd hh:mm:ss")String
start,@MBeanInfo("finish: yyyy-MM-dd hh:mm:ss")String finish) throws Exception;
-
-
/**
- * Get the next time jobs will be fired
- * @return the time in milliseconds
- * @throws Exception
+ * Get the next time jobs will be fired from this scheduler store.
+ *
+ * @return the time in milliseconds of the next job to execute.
+ *
+ * @throws Exception if an error occurs while accessing the store.
*/
@MBeanInfo("get the next time a job is due to be scheduled ")
public abstract String getNextScheduleTime() throws Exception;
-
+
+ /**
+ * Gets the number of times a scheduled Job has been executed.
+ *
+ * @return the total number of time a scheduled job has executed.
+ *
+ * @throws Exception if an error occurs while querying for the Job.
+ */
+ @MBeanInfo("get the next time a job is due to be scheduled ")
+ public abstract int getExecutionCount(@MBeanInfo("jobId")String jobId)
throws Exception;
+
/**
- * Get all the jobs scheduled to run next
+ * Get all the jobs scheduled to run next.
+ *
* @return a list of jobs that will be scheduled next
- * @throws Exception
+ *
+ * @throws Exception if an error occurs while reading the scheduler store.
*/
@MBeanInfo("get the next job(s) to be scheduled. Not HTML friendly ")
public abstract TabularData getNextScheduleJobs() throws Exception;
-
- /**
- * Get all the outstanding Jobs
- * @return a table of all jobs
- * @throws Exception
+ /**
+ * Get all the outstanding Jobs that are scheduled in this scheduler store.
+ *
+ * @return a table of all jobs in this scheduler store.
+ *
+ * @throws Exception if an error occurs while reading the store.
*/
@MBeanInfo("get the scheduled Jobs in the Store. Not HTML friendly ")
public abstract TabularData getAllJobs() throws Exception;
-
+
/**
- * Get all outstanding jobs due to run between start and finish
+ * Get all outstanding jobs due to run between start and finish time range.
+ *
* @param start
+ * the starting time range to query the store for jobs.
* @param finish
- * @return a table of jobs in the range
- * @throws Exception
-
+ * the ending time of this query for scheduled jobs.
+ *
+ * @return a table of jobs in the range given.
+ *
+ * @throws Exception if an error occurs while querying the scheduler store.
*/
@MBeanInfo("get the scheduled Jobs in the Store within the time range. Not
HTML friendly ")
public abstract TabularData getAllJobs(@MBeanInfo("start: yyyy-MM-dd
hh:mm:ss")String start,@MBeanInfo("finish: yyyy-MM-dd hh:mm:ss")String finish)throws
Exception;
+
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
----------------------------------------------------------------------
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
index 7b28a5b..047fe23 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
@@ -16,7 +16,12 @@
*/
package org.apache.activemq.broker.scheduler;
-
+/**
+ * Interface for a scheduled Job object.
+ *
+ * Each Job is identified by a unique Job Id which can be used to reference
the Job
+ * in the Job Scheduler store for updates or removal.
+ */
public interface Job {
/**
@@ -38,11 +43,12 @@ public interface Job {
* @return the Delay
*/
public abstract long getDelay();
+
/**
* @return the period
*/
public abstract long getPeriod();
-
+
/**
* @return the cron entry
*/
@@ -52,17 +58,24 @@ public interface Job {
* @return the payload
*/
public abstract byte[] getPayload();
-
+
/**
* Get the start time as a Date time string
* @return the date time
*/
public String getStartTime();
-
+
/**
- * Get the time the job is next due to execute
+ * Get the time the job is next due to execute
* @return the date time
*/
public String getNextExecutionTime();
+ /**
+ * Gets the total number of times this job has executed.
+ *
+ * @returns the number of times this job has been executed.
+ */
+ public int getExecutionCount();
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
----------------------------------------------------------------------
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
index c53d9c6..a453595 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
@@ -18,13 +18,21 @@ package org.apache.activemq.broker.scheduler;
import org.apache.activemq.util.ByteSequence;
+/**
+ * Job event listener interface. Provides event points for Job related events
+ * such as job ready events.
+ */
public interface JobListener {
-
+
/**
- * A Job that has been scheduled is now ready
- * @param id
+ * A Job that has been scheduled is now ready to be fired. The Job is
passed
+ * in its raw byte form and must be un-marshaled before being delivered.
+ *
+ * @param jobId
+ * The unique Job Id of the Job that is ready to fire.
* @param job
+ * The job that is now ready, delivered in byte form.
*/
- public void scheduledJob(String id,ByteSequence job);
+ public void scheduledJob(String id, ByteSequence job);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
----------------------------------------------------------------------
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
index 2e96eae..e951861 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
@@ -46,20 +46,25 @@ public interface JobScheduler {
void stopDispatching() throws Exception;
/**
- * Add a Job listener
+ * Add a Job listener which will receive events related to scheduled jobs.
+ *
+ * @param listener
+ * The job listener to add.
*
- * @param l
* @throws Exception
*/
- void addListener(JobListener l) throws Exception;
+ void addListener(JobListener listener) throws Exception;
/**
- * remove a JobListener
+ * remove a JobListener that was previously registered. If the given
listener is not in
+ * the registry this method has no effect.
+ *
+ * @param listener
+ * The listener that should be removed from the listener registry.
*
- * @param l
* @throws Exception
*/
- void removeListener(JobListener l) throws Exception;
+ void removeListener(JobListener listener) throws Exception;
/**
* Add a job to be scheduled
@@ -70,7 +75,8 @@ public interface JobScheduler {
* the message to be sent when the job is scheduled
* @param delay
* the time in milliseconds before the job will be run
- * @throws Exception
+ *
+ * @throws Exception if an error occurs while scheduling the Job.
*/
void schedule(String jobId, ByteSequence payload, long delay) throws
Exception;
@@ -82,8 +88,9 @@ public interface JobScheduler {
* @param payload
* the message to be sent when the job is scheduled
* @param cronEntry
- * - cron entry
- * @throws Exception
+ * The cron entry to use to schedule this job.
+ *
+ * @throws Exception if an error occurs while scheduling the Job.
*/
void schedule(String jobId, ByteSequence payload, String cronEntry)
throws Exception;
@@ -95,7 +102,7 @@ public interface JobScheduler {
* @param payload
* the message to be sent when the job is scheduled
* @param cronEntry
- * - cron entry
+ * cron entry
* @param delay
* time in ms to wait before scheduling
* @param period
@@ -110,6 +117,8 @@ public interface JobScheduler {
* remove all jobs scheduled to run at this time
*
* @param time
+ * The UTC time to use to remove a batch of scheduled Jobs.
+ *
* @throws Exception
*/
void remove(long time) throws Exception;
@@ -118,7 +127,9 @@ public interface JobScheduler {
* remove a job with the matching jobId
*
* @param jobId
- * @throws Exception
+ * The unique Job Id to search for and remove from the scheduled set
of jobs.
+ *
+ * @throws Exception if an error occurs while removing the Job.
*/
void remove(String jobId) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
----------------------------------------------------------------------
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
index d46d04a..24a216a 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
@@ -21,6 +21,12 @@ import java.util.List;
import org.apache.activemq.util.ByteSequence;
+/**
+ * A wrapper for instances of the JobScheduler interface that ensures that
methods
+ * provides safe and sane return values and can deal with null values being
passed
+ * in etc. Provides a measure of safety when using unknown implementations of
the
+ * JobSchedulerStore which might not always do the right thing.
+ */
public class JobSchedulerFacade implements JobScheduler {
private final SchedulerBroker broker;
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
----------------------------------------------------------------------
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
index 3cbc367..c6863c7 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
@@ -26,13 +26,56 @@ import org.apache.activemq.Service;
*/
public interface JobSchedulerStore extends Service {
+ /**
+ * Gets the location where the Job Scheduler will write the persistent
data used
+ * to preserve and recover scheduled Jobs.
+ *
+ * If the scheduler implementation does not utilize a file system based
store this
+ * method returns null.
+ *
+ * @return the directory where persistent store data is written.
+ */
File getDirectory();
+ /**
+ * Sets the directory where persistent store data will be written. This
method
+ * must be called before the scheduler store is started to have any effect.
+ *
+ * @param directory
+ * The directory where the job scheduler store is to be located.
+ */
void setDirectory(File directory);
+ /**
+ * The size of the current store on disk if the store utilizes a disk
based store
+ * mechanism.
+ *
+ * @return the current store size on disk.
+ */
long size();
+ /**
+ * Returns the JobScheduler instance identified by the given name.
+ *
+ * @param name
+ * the name of the JobScheduler instance to lookup.
+ *
+ * @return the named JobScheduler or null if none exists with the given
name.
+ *
+ * @throws Exception if an error occurs while loading the named scheduler.
+ */
JobScheduler getJobScheduler(String name) throws Exception;
+ /**
+ * Removes the named JobScheduler if it exists, purging all scheduled
messages
+ * assigned to it.
+ *
+ * @param name
+ * the name of the scheduler instance to remove.
+ *
+ * @return true if there was a scheduler with the given name to remove.
+ *
+ * @throws Exception if an error occurs while removing the scheduler.
+ */
boolean removeJobScheduler(String name) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
----------------------------------------------------------------------
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
index 6b78d77..fc5b8dd 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
@@ -20,7 +20,11 @@ import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
+/**
+ * A class to provide common Job Scheduler related methods.
+ */
public class JobSupport {
+
public static String getDateTime(long value) {
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = new Date(value);
@@ -32,5 +36,4 @@ public class JobSupport {
Date date = dfm.parse(value);
return date.getTime();
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
----------------------------------------------------------------------
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
index 31efd32..01a9634 100755
---
a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
@@ -22,6 +22,7 @@ import java.util.Set;
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
@@ -31,74 +32,99 @@ import org.apache.activemq.usage.SystemUsage;
/**
* Adapter to the actual persistence mechanism used with ActiveMQ
*
- *
+ *
*/
public interface PersistenceAdapter extends Service {
/**
- * Returns a set of all the {@link
org.apache.activemq.command.ActiveMQDestination}
- * objects that the persistence store is aware exist.
+ * Returns a set of all the
+ * {@link org.apache.activemq.command.ActiveMQDestination} objects that the
+ * persistence store is aware exist.
*
* @return active destinations
*/
Set<ActiveMQDestination> getDestinations();
/**
- * Factory method to create a new queue message store with the given
destination name
+ * Factory method to create a new queue message store with the given
+ * destination name
+ *
* @param destination
* @return the message store
- * @throws IOException
+ * @throws IOException
*/
MessageStore createQueueMessageStore(ActiveMQQueue destination) throws
IOException;
/**
- * Factory method to create a new topic message store with the given
destination name
- * @param destination
+ * Factory method to create a new topic message store with the given
+ * destination name
+ *
+ * @param destination
* @return the topic message store
- * @throws IOException
+ * @throws IOException
*/
TopicMessageStore createTopicMessageStore(ActiveMQTopic destination)
throws IOException;
/**
+ * Creates and returns a new Job Scheduler store instance.
+ *
+ * @return a new JobSchedulerStore instance if this Persistence adapter
provides its own.
+ *
+ * @throws IOException If an error occurs while creating the new
JobSchedulerStore.
+ * @throws UnsupportedOperationException If this adapter does not provide
its own
+ * scheduler store implementation.
+ */
+ JobSchedulerStore createJobSchedulerStore() throws IOException,
UnsupportedOperationException;
+
+ /**
* Cleanup method to remove any state associated with the given
destination.
* This method does not stop the message store (it might not be cached).
- * @param destination Destination to forget
+ *
+ * @param destination
+ * Destination to forget
*/
void removeQueueMessageStore(ActiveMQQueue destination);
/**
* Cleanup method to remove any state associated with the given
destination
* This method does not stop the message store (it might not be cached).
- * @param destination Destination to forget
+ *
+ * @param destination
+ * Destination to forget
*/
void removeTopicMessageStore(ActiveMQTopic destination);
/**
- * Factory method to create a new persistent prepared transaction store
for XA recovery
+ * Factory method to create a new persistent prepared transaction store for
+ * XA recovery
+ *
* @return transaction store
- * @throws IOException
+ * @throws IOException
*/
TransactionStore createTransactionStore() throws IOException;
/**
- * This method starts a transaction on the persistent storage - which is
nothing to
- * do with JMS or XA transactions - its purely a mechanism to perform
multiple writes
- * to a persistent store in 1 transaction as a performance optimization.
+ * This method starts a transaction on the persistent storage - which is
+ * nothing to do with JMS or XA transactions - its purely a mechanism to
+ * perform multiple writes to a persistent store in 1 transaction as a
+ * performance optimization.
* <p/>
- * Typically one transaction will require one disk synchronization point
and so for
- * real high performance its usually faster to perform many writes within
the same
- * transaction to minimize latency caused by disk synchronization. This is
especially
- * true when using tools like Berkeley Db or embedded JDBC servers.
- * @param context
- * @throws IOException
+ * Typically one transaction will require one disk synchronization point
and
+ * so for real high performance its usually faster to perform many writes
+ * within the same transaction to minimize latency caused by disk
+ * synchronization. This is especially true when using tools like Berkeley
+ * Db or embedded JDBC servers.
+ *
+ * @param context
+ * @throws IOException
*/
void beginTransaction(ConnectionContext context) throws IOException;
-
/**
* Commit a persistence transaction
- * @param context
- * @throws IOException
+ *
+ * @param context
+ * @throws IOException
*
* @see PersistenceAdapter#beginTransaction(ConnectionContext context)
*/
@@ -106,40 +132,45 @@ public interface PersistenceAdapter extends Service {
/**
* Rollback a persistence transaction
- * @param context
- * @throws IOException
+ *
+ * @param context
+ * @throws IOException
*
* @see PersistenceAdapter#beginTransaction(ConnectionContext context)
*/
void rollbackTransaction(ConnectionContext context) throws IOException;
-
+
/**
- *
+ *
* @return last broker sequence
* @throws IOException
*/
long getLastMessageBrokerSequenceId() throws IOException;
-
+
/**
* Delete's all the messages in the persistent store.
- *
+ *
* @throws IOException
*/
void deleteAllMessages() throws IOException;
-
+
/**
- * @param usageManager The UsageManager that is controlling the broker's
memory usage.
+ * @param usageManager
+ * The UsageManager that is controlling the broker's memory
+ * usage.
*/
void setUsageManager(SystemUsage usageManager);
-
+
/**
* Set the name of the broker using the adapter
+ *
* @param brokerName
*/
void setBrokerName(String brokerName);
-
+
/**
* Set the directory where any data files should be created
+ *
* @param dir
*/
void setDirectory(File dir);
@@ -148,26 +179,30 @@ public interface PersistenceAdapter extends Service {
* @return the directory used by the persistence adaptor
*/
File getDirectory();
-
+
/**
* checkpoint any
- * @param sync
- * @throws IOException
+ *
+ * @param sync
+ * @throws IOException
*
*/
void checkpoint(boolean sync) throws IOException;
-
+
/**
* A hint to return the size of the store on disk
+ *
* @return disk space used in bytes of 0 if not implemented
*/
long size();
/**
- * return the last stored producer sequenceId for this producer Id
- * used to suppress duplicate sends on failover reconnect at the transport
- * when a reconnect occurs
- * @param id the producerId to find a sequenceId for
+ * return the last stored producer sequenceId for this producer Id used to
+ * suppress duplicate sends on failover reconnect at the transport when a
+ * reconnect occurs
+ *
+ * @param id
+ * the producerId to find a sequenceId for
* @return the last stored sequence id or -1 if no suppression needed
*/
long getLastProducerSequenceId(ProducerId id) throws IOException;
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
----------------------------------------------------------------------
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
index 0fd6bfc..73ea104 100755
---
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
@@ -24,6 +24,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
@@ -39,7 +40,7 @@ import org.slf4j.LoggerFactory;
/**
* @org.apache.xbean.XBean
- *
+ *
*/
public class MemoryPersistenceAdapter implements PersistenceAdapter {
private static final Logger LOG =
LoggerFactory.getLogger(MemoryPersistenceAdapter.class);
@@ -49,6 +50,7 @@ public class MemoryPersistenceAdapter implements
PersistenceAdapter {
ConcurrentHashMap<ActiveMQDestination, MessageStore> queues = new
ConcurrentHashMap<ActiveMQDestination, MessageStore>();
private boolean useExternalMessageReferences;
+ @Override
public Set<ActiveMQDestination> getDestinations() {
Set<ActiveMQDestination> rc = new
HashSet<ActiveMQDestination>(queues.size() + topics.size());
for (Iterator<ActiveMQDestination> iter = queues.keySet().iterator();
iter.hasNext();) {
@@ -64,6 +66,7 @@ public class MemoryPersistenceAdapter implements
PersistenceAdapter {
return new MemoryPersistenceAdapter();
}
+ @Override
public MessageStore createQueueMessageStore(ActiveMQQueue destination)
throws IOException {
MessageStore rc = queues.get(destination);
if (rc == null) {
@@ -76,6 +79,7 @@ public class MemoryPersistenceAdapter implements
PersistenceAdapter {
return rc;
}
+ @Override
public TopicMessageStore createTopicMessageStore(ActiveMQTopic
destination) throws IOException {
TopicMessageStore rc = topics.get(destination);
if (rc == null) {
@@ -93,6 +97,7 @@ public class MemoryPersistenceAdapter implements
PersistenceAdapter {
*
* @param destination Destination to forget
*/
+ @Override
public void removeQueueMessageStore(ActiveMQQueue destination) {
queues.remove(destination);
}
@@ -102,10 +107,12 @@ public class MemoryPersistenceAdapter implements
PersistenceAdapter {
*
* @param destination Destination to forget
*/
+ @Override
public void removeTopicMessageStore(ActiveMQTopic destination) {
topics.remove(destination);
}
+ @Override
public TransactionStore createTransactionStore() throws IOException {
if (transactionStore == null) {
transactionStore = new MemoryTransactionStore(this);
@@ -113,25 +120,32 @@ public class MemoryPersistenceAdapter implements
PersistenceAdapter {
return transactionStore;
}
+ @Override
public void beginTransaction(ConnectionContext context) {
}
+ @Override
public void commitTransaction(ConnectionContext context) {
}
+ @Override
public void rollbackTransaction(ConnectionContext context) {
}
+ @Override
public void start() throws Exception {
}
+ @Override
public void stop() throws Exception {
}
+ @Override
public long getLastMessageBrokerSequenceId() throws IOException {
return 0;
}
+ @Override
public void deleteAllMessages() throws IOException {
for (Iterator<TopicMessageStore> iter = topics.values().iterator();
iter.hasNext();) {
MemoryMessageStore store = asMemoryMessageStore(iter.next());
@@ -177,38 +191,52 @@ public class MemoryPersistenceAdapter implements
PersistenceAdapter {
* @param usageManager The UsageManager that is controlling the broker's
* memory usage.
*/
+ @Override
public void setUsageManager(SystemUsage usageManager) {
}
+ @Override
public String toString() {
return "MemoryPersistenceAdapter";
}
+ @Override
public void setBrokerName(String brokerName) {
}
+ @Override
public void setDirectory(File dir) {
}
-
+
+ @Override
public File getDirectory(){
return null;
}
+ @Override
public void checkpoint(boolean sync) throws IOException {
}
-
+
+ @Override
public long size(){
return 0;
}
-
+
public void setCreateTransactionStore(boolean create) throws IOException {
if (create) {
createTransactionStore();
}
}
+ @Override
public long getLastProducerSequenceId(ProducerId id) {
// memory map does duplicate suppression
return -1;
}
+
+ @Override
+ public JobSchedulerStore createJobSchedulerStore() throws IOException,
UnsupportedOperationException {
+ // We could eventuall implement an in memory scheduler.
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java
----------------------------------------------------------------------
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java
b/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java
index a623de9..2a70194 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java
@@ -61,8 +61,9 @@ public final class IOHelper {
}
/**
- * Converts any string into a string that is safe to use as a file name.
- * The result will only include ascii characters and numbers, and the "-","_", and
"." characters.
+ * Converts any string into a string that is safe to use as a file name.
The
+ * result will only include ascii characters and numbers, and the "-","_",
+ * and "." characters.
*
* @param name
* @return
@@ -76,15 +77,16 @@ public final class IOHelper {
}
/**
- * Converts any string into a string that is safe to use as a file name.
- * The result will only include ascii characters and numbers, and the "-","_", and
"." characters.
+ * Converts any string into a string that is safe to use as a file name.
The
+ * result will only include ascii characters and numbers, and the "-","_",
+ * and "." characters.
*
* @param name
* @param dirSeparators
* @param maxFileLength
* @return
*/
- public static String toFileSystemSafeName(String name,boolean
dirSeparators,int maxFileLength) {
+ public static String toFileSystemSafeName(String name, boolean
dirSeparators, int maxFileLength) {
int size = name.length();
StringBuffer rc = new StringBuffer(size * 2);
for (int i = 0; i < size; i++) {
@@ -92,8 +94,7 @@ public final class IOHelper {
boolean valid = c >= 'a' && c <= 'z';
valid = valid || (c >= 'A' && c <= 'Z');
valid = valid || (c >= '0' && c <= '9');
- valid = valid || (c == '_') || (c == '-') || (c == '.') || (c=='#')
- ||(dirSeparators && ( (c == '/') || (c == '\\')));
+ valid = valid || (c == '_') || (c == '-') || (c == '.') || (c == '#') ||
(dirSeparators && ((c == '/') || (c == '\\')));
if (valid) {
rc.append(c);
@@ -105,7 +106,7 @@ public final class IOHelper {
}
String result = rc.toString();
if (result.length() > maxFileLength) {
- result =
result.substring(result.length()-maxFileLength,result.length());
+ result = result.substring(result.length() - maxFileLength,
result.length());
}
return result;
}
@@ -168,8 +169,7 @@ public final class IOHelper {
} else {
for (int i = 0; i < files.length; i++) {
File file = files[i];
- if (file.getName().equals(".")
- || file.getName().equals("..")) {
+ if (file.getName().equals(".") ||
file.getName().equals("..")) {
continue;
}
if (file.isDirectory()) {
@@ -190,6 +190,27 @@ public final class IOHelper {
}
}
+ public static void moveFiles(File srcDirectory, File targetDirectory, FilenameFilter filter) throws IOException {
+ if (!srcDirectory.isDirectory()) {
+ throw new IOException("source is not a directory");
+ }
+
+ if (targetDirectory.exists() && !targetDirectory.isDirectory()) {
+ throw new IOException("target exists and is not a directory");
+ } else {
+ mkdirs(targetDirectory);
+ }
+
+ List<File> filesToMove = new ArrayList<File>();
+ getFiles(srcDirectory, filesToMove, filter);
+
+ for (File file : filesToMove) {
+ if (!file.isDirectory()) {
+ moveFile(file, targetDirectory);
+ }
+ }
+ }
+
public static void copyFile(File src, File dest) throws IOException {
copyFile(src, dest, null);
}
@@ -222,32 +243,32 @@ public final class IOHelper {
File parent = src.getParentFile();
String fromPath = from.getAbsolutePath();
if (parent.getAbsolutePath().equals(fromPath)) {
- //one level down
+ // one level down
result = to;
- }else {
+ } else {
String parentPath = parent.getAbsolutePath();
String path = parentPath.substring(fromPath.length());
- result = new File(to.getAbsolutePath()+File.separator+path);
+ result = new File(to.getAbsolutePath() + File.separator + path);
}
return result;
}
- static List<File> getFiles(File dir,FilenameFilter filter){
+ static List<File> getFiles(File dir, FilenameFilter filter) {
List<File> result = new ArrayList<File>();
- getFiles(dir,result,filter);
+ getFiles(dir, result, filter);
return result;
}
- static void getFiles(File dir,List<File> list,FilenameFilter filter) {
+ static void getFiles(File dir, List<File> list, FilenameFilter filter) {
if (!list.contains(dir)) {
list.add(dir);
- String[] fileNames=dir.list(filter);
- for (int i =0; i < fileNames.length;i++) {
- File f = new File(dir,fileNames[i]);
+ String[] fileNames = dir.list(filter);
+ for (int i = 0; i < fileNames.length; i++) {
+ File f = new File(dir, fileNames[i]);
if (f.isFile()) {
list.add(f);
- }else {
- getFiles(dir,list,filter);
+ } else {
+ getFiles(dir, list, filter);
}
}
}
@@ -286,12 +307,13 @@ public final class IOHelper {
public static void mkdirs(File dir) throws IOException {
if (dir.exists()) {
if (!dir.isDirectory()) {
- throw new IOException("Failed to create directory '" + dir +"',
regular file already existed with that name");
+ throw new IOException("Failed to create directory '" + dir +
+ "', regular file already existed with that
name");
}
} else {
if (!dir.mkdirs()) {
- throw new IOException("Failed to create directory '" +
dir+"'");
+ throw new IOException("Failed to create directory '" + dir +
"'");
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
----------------------------------------------------------------------
diff --git
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
index 7ff4ae0..a3a8250 100755
---
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
+++
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
@@ -34,6 +34,7 @@ import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.Locker;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
@@ -422,6 +423,7 @@ public class JDBCPersistenceAdapter extends
DataSourceServiceSupport implements
this.lockDataSource = dataSource;
}
+ @Override
public BrokerService getBrokerService() {
return brokerService;
}
@@ -846,4 +848,9 @@ public class JDBCPersistenceAdapter extends
DataSourceServiceSupport implements
}
return result;
}
+
+ @Override
+ public JobSchedulerStore createJobSchedulerStore() throws IOException,
UnsupportedOperationException {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
----------------------------------------------------------------------
diff --git
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
index 565fc9f..cc5282f 100755
---
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
+++
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.activeio.journal.InvalidRecordLocationException;
import org.apache.activeio.journal.Journal;
import org.apache.activeio.journal.JournalEventListener;
@@ -40,6 +41,7 @@ import org.apache.activeio.packet.Packet;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
@@ -78,14 +80,14 @@ import org.slf4j.LoggerFactory;
* An implementation of {@link PersistenceAdapter} designed for use with a
* {@link Journal} and then check pointing asynchronously on a timeout with
some
* other long term persistent storage.
- *
+ *
* @org.apache.xbean.XBean
- *
+ *
*/
public class JournalPersistenceAdapter implements PersistenceAdapter,
JournalEventListener, UsageListener, BrokerServiceAware {
private BrokerService brokerService;
-
+
protected Scheduler scheduler;
private static final Logger LOG =
LoggerFactory.getLogger(JournalPersistenceAdapter.class);
@@ -118,9 +120,9 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
private TaskRunnerFactory taskRunnerFactory;
private File directory;
- public JournalPersistenceAdapter() {
+ public JournalPersistenceAdapter() {
}
-
+
public JournalPersistenceAdapter(Journal journal, PersistenceAdapter
longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
setJournal(journal);
setTaskRunnerFactory(taskRunnerFactory);
@@ -135,13 +137,14 @@ public class JournalPersistenceAdapter implements
PersistenceAdapter, JournalEve
this.journal = journal;
journal.setJournalEventListener(this);
}
-
+
public void setPersistenceAdapter(PersistenceAdapter longTermPersistence)
{
this.longTermPersistence = longTermPersistence;
}
-
+
final Runnable createPeriodicCheckpointTask() {
return new Runnable() {
+ @Override
public void run() {
long lastTime = 0;
synchronized (this) {
@@ -158,11 +161,13 @@ public class JournalPersistenceAdapter implements
PersistenceAdapter, JournalEve
* @param usageManager The UsageManager that is controlling the
* destination's memory usage.
*/
+ @Override
public void setUsageManager(SystemUsage usageManager) {
this.usageManager = usageManager;
longTermPersistence.setUsageManager(usageManager);
}
+ @Override
public Set<ActiveMQDestination> getDestinations() {
Set<ActiveMQDestination> destinations = new
HashSet<ActiveMQDestination>(longTermPersistence.getDestinations());
destinations.addAll(queues.keySet());
@@ -178,6 +183,7 @@ public class JournalPersistenceAdapter implements
PersistenceAdapter, JournalEve
}
}
+ @Override
public MessageStore createQueueMessageStore(ActiveMQQueue destination)
throws IOException {
JournalMessageStore store = queues.get(destination);
if (store == null) {
@@ -188,6 +194,7 @@ public class JournalPersistenceAdapter implements
PersistenceAdapter, JournalEve
return store;
}
+ @Override
public TopicMessageStore createTopicMessageStore(ActiveMQTopic
destinationName) throws IOException {
JournalTopicMessageStore store = topics.get(destinationName);
if (store == null) {
@@ -203,6 +210,7 @@ public class JournalPersistenceAdapter implements
PersistenceAdapter, JournalEve
*
* @param destination Destination to forget
*/
+ @Override
public void removeQueueMessageStore(ActiveMQQueue destination) {
queues.remove(destination);
}
@@ -212,30 +220,37 @@ public class JournalPersistenceAdapter implements
PersistenceAdapter, JournalEve
*
* @param destination Destination to forget
*/
+ @Override
public void removeTopicMessageStore(ActiveMQTopic destination) {
topics.remove(destination);
}
+ @Override
public TransactionStore createTransactionStore() throws IOException {
return transactionStore;
}
+ @Override
public long getLastMessageBrokerSequenceId() throws IOException {
return longTermPersistence.getLastMessageBrokerSequenceId();
}
+ @Override
public void beginTransaction(ConnectionContext context) throws
IOException {
longTermPersistence.beginTransaction(context);
}
+ @Override
public void commitTransaction(ConnectionContext context) throws
IOException {
longTermPersistence.commitTransaction(context);
}
+ @Override
public void rollbackTransaction(ConnectionContext context) throws
IOException {
longTermPersistence.rollbackTransaction(context);
}
+ @Override
public synchronized void start() throws Exception {
if (!started.compareAndSet(false, true)) {
return;
@@ -246,12 +261,14 @@ public class JournalPersistenceAdapter implements
PersistenceAdapter, JournalEve
}
checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
+ @Override
public boolean iterate() {
return doCheckpoint();
}
}, "ActiveMQ Journal Checkpoint Worker");
checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+ @Override
public Thread newThread(Runnable runable) {
Thread t = new Thread(runable, "Journal checkpoint worker");
t.setPriority(7);
@@ -279,6 +296,7 @@ public class JournalPersistenceAdapter implements
PersistenceAdapter, JournalEve
}
+ @Override
public void stop() throws Exception {
this.usageManager.getMemoryUsage().removeUsageListener(this);
@@ -330,16 +348,17 @@ public class JournalPersistenceAdapter implements
PersistenceAdapter, JournalEve
/**
* The Journal give us a call back so that we can move old data out of the
* journal. Taking a checkpoint does this for us.
- *
+ *
* @see
org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation)
*/
+ @Override
public void overflowNotification(RecordLocation safeLocation) {
checkpoint(false, true);
}
/**
* When we checkpoint we move all the journalled data to long term
storage.
- *
+ *
*/
public void checkpoint(boolean sync, boolean fullCheckpoint) {
try {
@@ -369,13 +388,14 @@ public class JournalPersistenceAdapter implements
PersistenceAdapter, JournalEve
}
}
+ @Override
public void checkpoint(boolean sync) {
checkpoint(sync, sync);
}
/**
* This does the actual checkpoint.
- *
+ *
* @return
*/
public boolean doCheckpoint() {
@@ -398,7 +418,7 @@ public class JournalPersistenceAdapter implements
PersistenceAdapter, JournalEve
// We do many partial checkpoints (fullCheckpoint==false) to move
// topic messages
// to long term store as soon as possible.
- //
+ //
// We want to avoid doing that for queue messages since removes
the
// come in the same
// checkpoint cycle will nullify the previous message add.
@@ -411,6 +431,7 @@ public class JournalPersistenceAdapter implements
PersistenceAdapter, JournalEve
try {
final JournalMessageStore ms = iterator.next();
FutureTask<RecordLocation> task = new
FutureTask<RecordLocation>(new Callable<RecordLocation>() {
+ @Override
public RecordLocation call() throws Exception {
return ms.checkpoint();
}
@@ -428,6 +449,7 @@ public class JournalPersistenceAdapter implements
PersistenceAdapter, JournalEve
try {
final JournalTopicMessageStore ms = iterator.next();
FutureTask<RecordLocation> task = new
FutureTask<RecordLocation>(new Callable<RecordLocation>() {
+ @Override
public RecordLocation call() throws Exception {
return ms.checkpoint();
}
@@ -505,7 +527,7 @@ public class JournalPersistenceAdapter implements
PersistenceAdapter, JournalEve
/**
* Move all the messages that were in the journal into long term storage.
We
* just replay and do a checkpoint.
- *
+ *
* @throws IOException
* @throws IOException
* @throws InvalidRecordLocationException
@@ -644,11 +666,11 @@ public class JournalPersistenceAdapter implements
PersistenceAdapter, JournalEve
public RecordLocation writeCommand(DataStructure command, boolean sync)
throws IOException {
if (started.get()) {
try {
- return journal.write(toPacket(wireFormat.marshal(command)),
sync);
+ return journal.write(toPacket(wireFormat.marshal(command)),
sync);
} catch (IOException ioe) {
- LOG.error("Cannot write to the journal", ioe);
- brokerService.handleIOException(ioe);
- throw ioe;
+ LOG.error("Cannot write to the journal", ioe);
+ brokerService.handleIOException(ioe);
+ throw ioe;
}
}
throw new IOException("closed");
@@ -660,6 +682,7 @@ public class JournalPersistenceAdapter implements
PersistenceAdapter, JournalEve
return writeCommand(trace, sync);
}
+ @Override
public void onUsageChanged(Usage usage, int oldPercentUsage, int
newPercentUsage) {
newPercentUsage = (newPercentUsage / 10) * 10;
oldPercentUsage = (oldPercentUsage / 10) * 10;
@@ -673,6 +696,7 @@ public class JournalPersistenceAdapter implements
PersistenceAdapter, JournalEve
return transactionStore;
}
+ @Override
public void deleteAllMessages() throws IOException {
try {
JournalTrace trace = new JournalTrace();
@@ -735,6 +759,7 @@ public class JournalPersistenceAdapter implements
PersistenceAdapter, JournalEve
return new ByteSequence(sequence.getData(), sequence.getOffset(),
sequence.getLength());
}
+ @Override
public void setBrokerName(String brokerName) {
longTermPersistence.setBrokerName(brokerName);
}
@@ -744,18 +769,22 @@ public class JournalPersistenceAdapter implements
PersistenceAdapter, JournalEve
return "JournalPersistenceAdapter(" + longTermPersistence + ")";
}
+ @Override
public void setDirectory(File dir) {
this.directory=dir;
}
-
+
+ @Override
public File getDirectory(){
return directory;
}
-
+
+ @Override
public long size(){
return 0;
}
+ @Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
PersistenceAdapter pa = getLongTermPersistence();
@@ -764,8 +793,14 @@ public class JournalPersistenceAdapter implements
PersistenceAdapter, JournalEve
}
}
+ @Override
public long getLastProducerSequenceId(ProducerId id) {
return -1;
}
+ @Override
+ public JobSchedulerStore createJobSchedulerStore() throws IOException,
UnsupportedOperationException {
+ return longTermPersistence.createJobSchedulerStore();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java
----------------------------------------------------------------------
diff --git
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java
new file mode 100644
index 0000000..edb2750
--- /dev/null
+++
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import org.apache.activemq.store.kahadb.disk.journal.Location;
+import org.apache.activemq.store.kahadb.disk.page.Page;
+
+public abstract class AbstractKahaDBMetaData<T> implements KahaDBMetaData<T> {
+
+ private int state;
+ private Location lastUpdateLocation;
+ private Page<T> page;
+
+ @Override
+ public Page<T> getPage() {
+ return page;
+ }
+
+ @Override
+ public int getState() {
+ return state;
+ }
+
+ @Override
+ public Location getLastUpdateLocation() {
+ return lastUpdateLocation;
+ }
+
+ @Override
+ public void setPage(Page<T> page) {
+ this.page = page;
+ }
+
+ @Override
+ public void setState(int value) {
+ this.state = value;
+ }
+
+ @Override
+ public void setLastUpdateLocation(Location location) {
+ this.lastUpdateLocation = location;
+ }
+}
.