-1

This is a huge change that completely alters the Job scheduler store to use a totally new store format and while the update does allow for the update from an older to a newer store this not something I would expect to see in a point release.

On 12/16/2014 05:10 PM, [email protected] wrote:
https://issues.apache.org/jira/browse/AMQ-3758

Refactor the scheduler store into a more KahaDB style store that can
recover from various problems like missing journal files or corruption
as well as rebuild its index when needed.  Move the scheduler store into
a more configurable style that allows for users to plug in their own
implementations.  Store update from legacy versions is automatic.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fc244f48
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fc244f48
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fc244f48

Branch: refs/heads/activemq-5.10.x
Commit: fc244f48e48596c668a7d9dc3b84c26e60693823
Parents: db669e4
Author: Timothy Bish <[email protected]>
Authored: Mon Jul 7 12:28:11 2014 -0400
Committer: Hadrian Zbarcea <[email protected]>
Committed: Tue Dec 16 16:11:09 2014 -0500

----------------------------------------------------------------------
  .../apache/activemq/broker/BrokerService.java   |   25 +-
  .../activemq/broker/jmx/JobSchedulerView.java   |   56 +-
  .../broker/jmx/JobSchedulerViewMBean.java       |  113 +-
  .../apache/activemq/broker/scheduler/Job.java   |   23 +-
  .../activemq/broker/scheduler/JobListener.java  |   16 +-
  .../activemq/broker/scheduler/JobScheduler.java |   33 +-
  .../broker/scheduler/JobSchedulerFacade.java    |    6 +
  .../broker/scheduler/JobSchedulerStore.java     |   43 +
  .../activemq/broker/scheduler/JobSupport.java   |    5 +-
  .../activemq/store/PersistenceAdapter.java      |  119 +-
  .../store/memory/MemoryPersistenceAdapter.java  |   36 +-
  .../java/org/apache/activemq/util/IOHelper.java |   68 +-
  .../store/jdbc/JDBCPersistenceAdapter.java      |    7 +
  .../journal/JournalPersistenceAdapter.java      |   71 +-
  .../store/kahadb/AbstractKahaDBMetaData.java    |   57 +
  .../store/kahadb/AbstractKahaDBStore.java       |  745 ++++++++++++
  .../activemq/store/kahadb/KahaDBMetaData.java   |  135 +++
  .../store/kahadb/KahaDBPersistenceAdapter.java  |   15 +-
  .../activemq/store/kahadb/KahaDBStore.java      |   55 +-
  .../kahadb/MultiKahaDBPersistenceAdapter.java   |   56 +-
  .../kahadb/MultiKahaDBTransactionStore.java     |   18 +-
  .../activemq/store/kahadb/TempKahaDBStore.java  |  138 ++-
  .../apache/activemq/store/kahadb/Visitor.java   |   20 +
  .../store/kahadb/scheduler/JobImpl.java         |   21 +-
  .../store/kahadb/scheduler/JobLocation.java     |   77 +-
  .../scheduler/JobLocationsMarshaller.java       |   53 +
  .../kahadb/scheduler/JobSchedulerImpl.java      |  837 ++++++++------
  .../scheduler/JobSchedulerKahaDBMetaData.java   |  246 ++++
  .../kahadb/scheduler/JobSchedulerStoreImpl.java | 1076 +++++++++++++-----
  .../scheduler/UnknownStoreVersionException.java |   24 +
  .../kahadb/scheduler/legacy/LegacyJobImpl.java  |   72 ++
  .../scheduler/legacy/LegacyJobLocation.java     |  296 +++++
  .../legacy/LegacyJobSchedulerImpl.java          |  222 ++++
  .../legacy/LegacyJobSchedulerStoreImpl.java     |  378 ++++++
  .../scheduler/legacy/LegacyStoreReplayer.java   |  155 +++
  .../src/main/proto/journal-data.proto           |   61 +
  .../apache/activemq/leveldb/LevelDBStore.scala  |    5 +
  .../leveldb/replicated/ProxyLevelDBStore.scala  |    5 +
  .../JobSchedulerBrokerShutdownTest.java         |    1 +
  .../JobSchedulerJmxManagementTests.java         |  155 +++
  .../scheduler/JobSchedulerManagementTest.java   |   84 +-
  .../JobSchedulerStoreCheckpointTest.java        |  125 ++
  .../broker/scheduler/JobSchedulerStoreTest.java |   46 +-
  .../broker/scheduler/JobSchedulerTest.java      |   36 +
  .../scheduler/JobSchedulerTestSupport.java      |  112 ++
  .../KahaDBSchedulerIndexRebuildTest.java        |  179 +++
  .../KahaDBSchedulerMissingJournalLogsTest.java  |  204 ++++
  .../scheduler/SchedulerDBVersionTest.java       |  164 +++
  .../src/test/resources/log4j.properties         |    1 +
  .../activemq/store/schedulerDB/legacy/db-1.log  |  Bin 0 -> 524288 bytes
  .../store/schedulerDB/legacy/scheduleDB.data    |  Bin 0 -> 20480 bytes
  .../store/schedulerDB/legacy/scheduleDB.redo    |  Bin 0 -> 16408 bytes
  52 files changed, 5584 insertions(+), 911 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index 00d4abd..5becec2 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -1861,6 +1861,23 @@ public class BrokerService implements Service {
try {
                  PersistenceAdapter pa = getPersistenceAdapter();
+                if (pa != null) {
+                    this.jobSchedulerStore = pa.createJobSchedulerStore();
+                    
jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
+                    configureService(jobSchedulerStore);
+                    jobSchedulerStore.start();
+                    return this.jobSchedulerStore;
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            } catch (UnsupportedOperationException ex) {
+                // It's ok if the store doesn't implement a scheduler.
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+
+            try {
+                PersistenceAdapter pa = getPersistenceAdapter();
                  if (pa != null && pa instanceof JobSchedulerStore) {
                      this.jobSchedulerStore = (JobSchedulerStore) pa;
                      configureService(jobSchedulerStore);
@@ -1870,9 +1887,13 @@ public class BrokerService implements Service {
                  throw new RuntimeException(e);
              }
+ // Load the KahaDB store as a last resort, this only works if KahaDB is
+            // included at runtime, otherwise this will fail.  User should 
disable
+            // scheduler support if this fails.
              try {
-                String clazz = 
"org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl";
-                jobSchedulerStore = (JobSchedulerStore) 
getClass().getClassLoader().loadClass(clazz).newInstance();
+                String clazz = 
"org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter";
+                PersistenceAdapter adaptor = 
(PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance();
+                jobSchedulerStore = adaptor.createJobSchedulerStore();
                  jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
                  configureService(jobSchedulerStore);
                  jobSchedulerStore.start();

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
index 9e5a1fb..2118a96 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
@@ -16,23 +16,39 @@
   */
  package org.apache.activemq.broker.jmx;
+import java.util.List;
+
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+
  import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
  import org.apache.activemq.broker.scheduler.Job;
  import org.apache.activemq.broker.scheduler.JobScheduler;
  import org.apache.activemq.broker.scheduler.JobSupport;
-import javax.management.openmbean.*;
-import java.io.IOException;
-import java.util.List;
-
+/**
+ * MBean object that can be used to manage a single instance of a 
JobScheduler.  The object
+ * provides methods for querying for jobs and removing some or all of the jobs 
that are
+ * scheduled in the managed store.
+ */
  public class JobSchedulerView implements JobSchedulerViewMBean {
private final JobScheduler jobScheduler; + /**
+     * Creates a new instance of the JobScheduler management MBean.
+     *
+     * @param jobScheduler
+     *        The scheduler instance to manage.
+     */
      public JobSchedulerView(JobScheduler jobScheduler) {
          this.jobScheduler = jobScheduler;
      }
+ @Override
      public TabularData getAllJobs() throws Exception {
          OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
          CompositeType ct = factory.getCompositeType();
@@ -45,6 +61,7 @@ public class JobSchedulerView implements 
JobSchedulerViewMBean {
          return rc;
      }
+ @Override
      public TabularData getAllJobs(String startTime, String finishTime) throws 
Exception {
          OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
          CompositeType ct = factory.getCompositeType();
@@ -59,6 +76,7 @@ public class JobSchedulerView implements 
JobSchedulerViewMBean {
          return rc;
      }
+ @Override
      public TabularData getNextScheduleJobs() throws Exception {
          OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
          CompositeType ct = factory.getCompositeType();
@@ -71,31 +89,51 @@ public class JobSchedulerView implements 
JobSchedulerViewMBean {
          return rc;
      }
+ @Override
      public String getNextScheduleTime() throws Exception {
          long time = this.jobScheduler.getNextScheduleTime();
          return JobSupport.getDateTime(time);
      }
+ @Override
      public void removeAllJobs() throws Exception {
          this.jobScheduler.removeAllJobs();
-
      }
+ @Override
      public void removeAllJobs(String startTime, String finishTime) throws 
Exception {
          long start = JobSupport.getDataTime(startTime);
          long finish = JobSupport.getDataTime(finishTime);
          this.jobScheduler.removeAllJobs(start, finish);
+    }
+ @Override
+    public void removeAllJobsAtScheduledTime(String time) throws Exception {
+        long removeAtTime = JobSupport.getDataTime(time);
+        this.jobScheduler.remove(removeAtTime);
      }
+ @Override
+    public void removeJobAtScheduledTime(String time) throws Exception {
+        removeAllJobsAtScheduledTime(time);
+    }
+
+    @Override
      public void removeJob(String jobId) throws Exception {
          this.jobScheduler.remove(jobId);
-
      }
- public void removeJobAtScheduledTime(String time) throws IOException {
-        // TODO Auto-generated method stub
+    @Override
+    public int getExecutionCount(String jobId) throws Exception {
+        int result = 0;
- }
+        List<Job> jobs = this.jobScheduler.getAllJobs();
+        for (Job job : jobs) {
+            if (job.getJobId().equals(jobId)) {
+                result = job.getExecutionCount();
+            }
+        }
+ return result;
+    }
  }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
index f5745ea..76a7926 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
@@ -18,76 +18,125 @@ package org.apache.activemq.broker.jmx;
import javax.management.openmbean.TabularData; -
-
  public interface JobSchedulerViewMBean {
+
      /**
-     * remove all jobs scheduled to run at this time
+     * Remove all jobs scheduled to run at this time.  If there are no jobs 
scheduled
+     * at the given time this methods returns without making any modifications 
to the
+     * scheduler store.
+     *
       * @param time
-     * @throws Exception
+     *        the string formated time that should be used to remove jobs.
+     *
+     * @throws Exception if an error occurs while performing the remove.
+     *
+     * @deprecated use removeAllJobsAtScheduledTime instead as it is more 
explicit about what
+     *             the method is actually doing.
       */
+    @Deprecated
      @MBeanInfo("remove jobs with matching execution time")
      public abstract void removeJobAtScheduledTime(@MBeanInfo("time: yyyy-MM-dd 
hh:mm:ss")String time) throws Exception;
/**
-     * remove a job with the matching jobId
+     * Remove all jobs scheduled to run at this time.  If there are no jobs 
scheduled
+     * at the given time this methods returns without making any modifications 
to the
+     * scheduler store.
+     *
+     * @param time
+     *        the string formated time that should be used to remove jobs.
+     *
+     * @throws Exception if an error occurs while performing the remove.
+     */
+    @MBeanInfo("remove jobs with matching execution time")
+    public abstract void removeAllJobsAtScheduledTime(@MBeanInfo("time: yyyy-MM-dd 
hh:mm:ss")String time) throws Exception;
+
+    /**
+     * Remove a job with the matching jobId.  If the method does not find a 
matching job
+     * then it returns without throwing an error or making any modifications 
to the job
+     * scheduler store.
+     *
       * @param jobId
-     * @throws Exception
+     *        the Job Id to remove from the scheduler store.
+     *
+     * @throws Exception if an error occurs while attempting to remove the Job.
       */
      @MBeanInfo("remove jobs with matching jobId")
      public abstract void removeJob(@MBeanInfo("jobId")String jobId) throws 
Exception;
-
+
      /**
-     * remove all the Jobs from the scheduler
-     * @throws Exception
+     * Remove all the Jobs from the scheduler,
+     *
+     * @throws Exception if an error occurs while purging the store.
       */
      @MBeanInfo("remove all scheduled jobs")
      public abstract void removeAllJobs() throws Exception;
-
+
      /**
-     * remove all the Jobs from the scheduler that are due between the start 
and finish times
-     * @param start time
-     * @param finish time
-     * @throws Exception
+     * Remove all the Jobs from the scheduler that are due between the start 
and finish times.
+     *
+     * @param start
+     *        the starting time to remove jobs from.
+     * @param finish
+     *        the finish time for the remove operation.
+     *
+     * @throws Exception if an error occurs while attempting to remove the 
jobs.
       */
      @MBeanInfo("remove all scheduled jobs between time ranges ")
      public abstract void removeAllJobs(@MBeanInfo("start: yyyy-MM-dd hh:mm:ss")String 
start,@MBeanInfo("finish: yyyy-MM-dd hh:mm:ss")String finish) throws Exception;
-
-
      /**
-     * Get the next time jobs will be fired
-     * @return the time in milliseconds
-     * @throws Exception
+     * Get the next time jobs will be fired from this scheduler store.
+     *
+     * @return the time in milliseconds of the next job to execute.
+     *
+     * @throws Exception if an error occurs while accessing the store.
       */
      @MBeanInfo("get the next time a job is due to be scheduled ")
      public abstract String getNextScheduleTime() throws Exception;
-
+
+    /**
+     * Gets the number of times a scheduled Job has been executed.
+     *
+     * @return the total number of time a scheduled job has executed.
+     *
+     * @throws Exception if an error occurs while querying for the Job.
+     */
+    @MBeanInfo("get the next time a job is due to be scheduled ")
+    public abstract int getExecutionCount(@MBeanInfo("jobId")String jobId) 
throws Exception;
+
      /**
-     * Get all the jobs scheduled to run next
+     * Get all the jobs scheduled to run next.
+     *
       * @return a list of jobs that will be scheduled next
-     * @throws Exception
+     *
+     * @throws Exception if an error occurs while reading the scheduler store.
       */
      @MBeanInfo("get the next job(s) to be scheduled. Not HTML friendly ")
      public abstract TabularData getNextScheduleJobs() throws Exception;
-
-    /**
-     * Get all the outstanding Jobs
-     * @return a  table of all jobs
-     * @throws Exception
+ /**
+     * Get all the outstanding Jobs that are scheduled in this scheduler store.
+     *
+     * @return a table of all jobs in this scheduler store.
+     *
+     * @throws Exception if an error occurs while reading the store.
       */
      @MBeanInfo("get the scheduled Jobs in the Store. Not HTML friendly ")
      public abstract TabularData getAllJobs() throws Exception;
-
+
      /**
-     * Get all outstanding jobs due to run between start and finish
+     * Get all outstanding jobs due to run between start and finish time range.
+     *
       * @param start
+     *        the starting time range to query the store for jobs.
       * @param finish
-     * @return a table of jobs in the range
-     * @throws Exception
-
+     *        the ending time of this query for scheduled jobs.
+     *
+     * @return a table of jobs in the range given.
+     *
+     * @throws Exception if an error occurs while querying the scheduler store.
       */
      @MBeanInfo("get the scheduled Jobs in the Store within the time range. Not 
HTML friendly ")
      public abstract TabularData getAllJobs(@MBeanInfo("start: yyyy-MM-dd 
hh:mm:ss")String start,@MBeanInfo("finish: yyyy-MM-dd hh:mm:ss")String finish)throws 
Exception;
+
  }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
index 7b28a5b..047fe23 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
@@ -16,7 +16,12 @@
   */
  package org.apache.activemq.broker.scheduler;
-
+/**
+ * Interface for a scheduled Job object.
+ *
+ * Each Job is identified by a unique Job Id which can be used to reference 
the Job
+ * in the Job Scheduler store for updates or removal.
+ */
  public interface Job {
/**
@@ -38,11 +43,12 @@ public interface Job {
       * @return the Delay
       */
      public abstract long getDelay();
+
      /**
       * @return the period
       */
      public abstract long getPeriod();
-
+
      /**
       * @return the cron entry
       */
@@ -52,17 +58,24 @@ public interface Job {
       * @return the payload
       */
      public abstract byte[] getPayload();
-
+
      /**
       * Get the start time as a Date time string
       * @return the date time
       */
      public String getStartTime();
-
+
      /**
-     * Get the time the job is next due to execute
+     * Get the time the job is next due to execute
       * @return the date time
       */
      public String getNextExecutionTime();
+ /**
+     * Gets the total number of times this job has executed.
+     *
+     * @returns the number of times this job has been executed.
+     */
+    public int getExecutionCount();
+
  }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
index c53d9c6..a453595 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
@@ -18,13 +18,21 @@ package org.apache.activemq.broker.scheduler;
import org.apache.activemq.util.ByteSequence; +/**
+ * Job event listener interface. Provides event points for Job related events
+ * such as job ready events.
+ */
  public interface JobListener {
-
+
      /**
-     * A Job that has been scheduled is now ready
-     * @param id
+     * A Job that has been scheduled is now ready to be fired.  The Job is 
passed
+     * in its raw byte form and must be un-marshaled before being delivered.
+     *
+     * @param jobId
+     *        The unique Job Id of the Job that is ready to fire.
       * @param job
+     *        The job that is now ready, delivered in byte form.
       */
-    public void scheduledJob(String id,ByteSequence job);
+    public void scheduledJob(String id, ByteSequence job);
}

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
index 2e96eae..e951861 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
@@ -46,20 +46,25 @@ public interface JobScheduler {
      void stopDispatching() throws Exception;
/**
-     * Add a Job listener
+     * Add a Job listener which will receive events related to scheduled jobs.
+     *
+     * @param listener
+     *      The job listener to add.
       *
-     * @param l
       * @throws Exception
       */
-    void addListener(JobListener l) throws Exception;
+    void addListener(JobListener listener) throws Exception;
/**
-     * remove a JobListener
+     * remove a JobListener that was previously registered.  If the given 
listener is not in
+     * the registry this method has no effect.
+     *
+     * @param listener
+     *      The listener that should be removed from the listener registry.
       *
-     * @param l
       * @throws Exception
       */
-    void removeListener(JobListener l) throws Exception;
+    void removeListener(JobListener listener) throws Exception;
/**
       * Add a job to be scheduled
@@ -70,7 +75,8 @@ public interface JobScheduler {
       *            the message to be sent when the job is scheduled
       * @param delay
       *            the time in milliseconds before the job will be run
-     * @throws Exception
+     *
+     * @throws Exception if an error occurs while scheduling the Job.
       */
      void schedule(String jobId, ByteSequence payload, long delay) throws 
Exception;
@@ -82,8 +88,9 @@ public interface JobScheduler {
       * @param payload
       *            the message to be sent when the job is scheduled
       * @param cronEntry
-     *            - cron entry
-     * @throws Exception
+     *            The cron entry to use to schedule this job.
+     *
+     * @throws Exception if an error occurs while scheduling the Job.
       */
      void schedule(String jobId, ByteSequence payload, String cronEntry) 
throws Exception;
@@ -95,7 +102,7 @@ public interface JobScheduler {
       * @param payload
       *            the message to be sent when the job is scheduled
       * @param cronEntry
-     *            - cron entry
+     *            cron entry
       * @param delay
       *            time in ms to wait before scheduling
       * @param period
@@ -110,6 +117,8 @@ public interface JobScheduler {
       * remove all jobs scheduled to run at this time
       *
       * @param time
+     *      The UTC time to use to remove a batch of scheduled Jobs.
+     *
       * @throws Exception
       */
      void remove(long time) throws Exception;
@@ -118,7 +127,9 @@ public interface JobScheduler {
       * remove a job with the matching jobId
       *
       * @param jobId
-     * @throws Exception
+     *      The unique Job Id to search for and remove from the scheduled set 
of jobs.
+     *
+     * @throws Exception if an error occurs while removing the Job.
       */
      void remove(String jobId) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
index d46d04a..24a216a 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
@@ -21,6 +21,12 @@ import java.util.List;
import org.apache.activemq.util.ByteSequence; +/**
+ * A wrapper for instances of the JobScheduler interface that ensures that 
methods
+ * provides safe and sane return values and can deal with null values being 
passed
+ * in etc.  Provides a measure of safety when using unknown implementations of 
the
+ * JobSchedulerStore which might not always do the right thing.
+ */
  public class JobSchedulerFacade implements JobScheduler {
private final SchedulerBroker broker;

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
index 3cbc367..c6863c7 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
@@ -26,13 +26,56 @@ import org.apache.activemq.Service;
   */
  public interface JobSchedulerStore extends Service {
+ /**
+     * Gets the location where the Job Scheduler will write the persistent 
data used
+     * to preserve and recover scheduled Jobs.
+     *
+     * If the scheduler implementation does not utilize a file system based 
store this
+     * method returns null.
+     *
+     * @return the directory where persistent store data is written.
+     */
      File getDirectory();
+ /**
+     * Sets the directory where persistent store data will be written.  This 
method
+     * must be called before the scheduler store is started to have any effect.
+     *
+     * @param directory
+     *      The directory where the job scheduler store is to be located.
+     */
      void setDirectory(File directory);
+ /**
+     * The size of the current store on disk if the store utilizes a disk 
based store
+     * mechanism.
+     *
+     * @return the current store size on disk.
+     */
      long size();
+ /**
+     * Returns the JobScheduler instance identified by the given name.
+     *
+     * @param name
+     *        the name of the JobScheduler instance to lookup.
+     *
+     * @return the named JobScheduler or null if none exists with the given 
name.
+     *
+     * @throws Exception if an error occurs while loading the named scheduler.
+     */
      JobScheduler getJobScheduler(String name) throws Exception;
+ /**
+     * Removes the named JobScheduler if it exists, purging all scheduled 
messages
+     * assigned to it.
+     *
+     * @param name
+     *        the name of the scheduler instance to remove.
+     *
+     * @return true if there was a scheduler with the given name to remove.
+     *
+     * @throws Exception if an error occurs while removing the scheduler.
+     */
      boolean removeJobScheduler(String name) throws Exception;
  }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
index 6b78d77..fc5b8dd 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
@@ -20,7 +20,11 @@ import java.text.DateFormat;
  import java.text.SimpleDateFormat;
  import java.util.Date;
+/**
+ * A class to provide common Job Scheduler related methods.
+ */
  public class JobSupport {
+
      public static String getDateTime(long value) {
          DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
          Date date = new Date(value);
@@ -32,5 +36,4 @@ public class JobSupport {
           Date date = dfm.parse(value);
           return date.getTime();
       }
-
  }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
index 31efd32..01a9634 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
@@ -22,6 +22,7 @@ import java.util.Set;
import org.apache.activemq.Service;
  import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
  import org.apache.activemq.command.ActiveMQDestination;
  import org.apache.activemq.command.ActiveMQQueue;
  import org.apache.activemq.command.ActiveMQTopic;
@@ -31,74 +32,99 @@ import org.apache.activemq.usage.SystemUsage;
  /**
   * Adapter to the actual persistence mechanism used with ActiveMQ
   *
- *
+ *
   */
  public interface PersistenceAdapter extends Service {
/**
-     * Returns a set of all the {@link 
org.apache.activemq.command.ActiveMQDestination}
-     * objects that the persistence store is aware exist.
+     * Returns a set of all the
+     * {@link org.apache.activemq.command.ActiveMQDestination} objects that the
+     * persistence store is aware exist.
       *
       * @return active destinations
       */
      Set<ActiveMQDestination> getDestinations();
/**
-     * Factory method to create a new queue message store with the given 
destination name
+     * Factory method to create a new queue message store with the given
+     * destination name
+     *
       * @param destination
       * @return the message store
-     * @throws IOException
+     * @throws IOException
       */
      MessageStore createQueueMessageStore(ActiveMQQueue destination) throws 
IOException;
/**
-     * Factory method to create a new topic message store with the given 
destination name
-     * @param destination
+     * Factory method to create a new topic message store with the given
+     * destination name
+     *
+     * @param destination
       * @return the topic message store
-     * @throws IOException
+     * @throws IOException
       */
      TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) 
throws IOException;
/**
+     * Creates and returns a new Job Scheduler store instance.
+     *
+     * @return a new JobSchedulerStore instance if this Persistence adapter 
provides its own.
+     *
+     * @throws IOException If an error occurs while creating the new 
JobSchedulerStore.
+     * @throws UnsupportedOperationException If this adapter does not provide 
its own
+     *                                       scheduler store implementation.
+     */
+    JobSchedulerStore createJobSchedulerStore() throws IOException, 
UnsupportedOperationException;
+
+    /**
       * Cleanup method to remove any state associated with the given 
destination.
       * This method does not stop the message store (it might not be cached).
-     * @param destination Destination to forget
+     *
+     * @param destination
+     *            Destination to forget
       */
      void removeQueueMessageStore(ActiveMQQueue destination);
/**
       * Cleanup method to remove any state associated with the given 
destination
       * This method does not stop the message store (it might not be cached).
-     * @param destination Destination to forget
+     *
+     * @param destination
+     *            Destination to forget
       */
      void removeTopicMessageStore(ActiveMQTopic destination);
/**
-     * Factory method to create a new persistent prepared transaction store 
for XA recovery
+     * Factory method to create a new persistent prepared transaction store for
+     * XA recovery
+     *
       * @return transaction store
-     * @throws IOException
+     * @throws IOException
       */
      TransactionStore createTransactionStore() throws IOException;
/**
-     * This method starts a transaction on the persistent storage - which is 
nothing to
-     * do with JMS or XA transactions - its purely a mechanism to perform 
multiple writes
-     * to a persistent store in 1 transaction as a performance optimization.
+     * This method starts a transaction on the persistent storage - which is
+     * nothing to do with JMS or XA transactions - its purely a mechanism to
+     * perform multiple writes to a persistent store in 1 transaction as a
+     * performance optimization.
       * <p/>
-     * Typically one transaction will require one disk synchronization point 
and so for
-     * real high performance its usually faster to perform many writes within 
the same
-     * transaction to minimize latency caused by disk synchronization. This is 
especially
-     * true when using tools like Berkeley Db or embedded JDBC servers.
-     * @param context
-     * @throws IOException
+     * Typically one transaction will require one disk synchronization point 
and
+     * so for real high performance its usually faster to perform many writes
+     * within the same transaction to minimize latency caused by disk
+     * synchronization. This is especially true when using tools like Berkeley
+     * Db or embedded JDBC servers.
+     *
+     * @param context
+     * @throws IOException
       */
      void beginTransaction(ConnectionContext context) throws IOException;
-
      /**
       * Commit a persistence transaction
-     * @param context
-     * @throws IOException
+     *
+     * @param context
+     * @throws IOException
       *
       * @see PersistenceAdapter#beginTransaction(ConnectionContext context)
       */
@@ -106,40 +132,45 @@ public interface PersistenceAdapter extends Service {
/**
       * Rollback a persistence transaction
-     * @param context
-     * @throws IOException
+     *
+     * @param context
+     * @throws IOException
       *
       * @see PersistenceAdapter#beginTransaction(ConnectionContext context)
       */
      void rollbackTransaction(ConnectionContext context) throws IOException;
-
+
      /**
-     *
+     *
       * @return last broker sequence
       * @throws IOException
       */
      long getLastMessageBrokerSequenceId() throws IOException;
-
+
      /**
       * Delete's all the messages in the persistent store.
-     *
+     *
       * @throws IOException
       */
      void deleteAllMessages() throws IOException;
-
+
      /**
-     * @param usageManager The UsageManager that is controlling the broker's 
memory usage.
+     * @param usageManager
+     *            The UsageManager that is controlling the broker's memory
+     *            usage.
       */
      void setUsageManager(SystemUsage usageManager);
-
+
      /**
       * Set the name of the broker using the adapter
+     *
       * @param brokerName
       */
      void setBrokerName(String brokerName);
-
+
      /**
       * Set the directory where any data files should be created
+     *
       * @param dir
       */
      void setDirectory(File dir);
@@ -148,26 +179,30 @@ public interface PersistenceAdapter extends Service {
       * @return the directory used by the persistence adaptor
       */
      File getDirectory();
-
+
      /**
       * checkpoint any
-     * @param sync
-     * @throws IOException
+     *
+     * @param sync
+     * @throws IOException
       *
       */
      void checkpoint(boolean sync) throws IOException;
-
+
      /**
       * A hint to return the size of the store on disk
+     *
       * @return disk space used in bytes of 0 if not implemented
       */
      long size();
/**
-     * return the last stored producer sequenceId for this producer Id
-     * used to suppress duplicate sends on failover reconnect at the transport
-     * when a reconnect occurs
-     * @param id the producerId to find a sequenceId for
+     * return the last stored producer sequenceId for this producer Id used to
+     * suppress duplicate sends on failover reconnect at the transport when a
+     * reconnect occurs
+     *
+     * @param id
+     *            the producerId to find a sequenceId for
       * @return the last stored sequence id or -1 if no suppression needed
       */
      long getLastProducerSequenceId(ProducerId id) throws IOException;

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
index 0fd6bfc..73ea104 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
@@ -24,6 +24,7 @@ import java.util.Set;
  import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
  import org.apache.activemq.command.ActiveMQDestination;
  import org.apache.activemq.command.ActiveMQQueue;
  import org.apache.activemq.command.ActiveMQTopic;
@@ -39,7 +40,7 @@ import org.slf4j.LoggerFactory;
/**
   * @org.apache.xbean.XBean
- *
+ *
   */
  public class MemoryPersistenceAdapter implements PersistenceAdapter {
      private static final Logger LOG = 
LoggerFactory.getLogger(MemoryPersistenceAdapter.class);
@@ -49,6 +50,7 @@ public class MemoryPersistenceAdapter implements 
PersistenceAdapter {
      ConcurrentHashMap<ActiveMQDestination, MessageStore> queues = new 
ConcurrentHashMap<ActiveMQDestination, MessageStore>();
      private boolean useExternalMessageReferences;
+ @Override
      public Set<ActiveMQDestination> getDestinations() {
          Set<ActiveMQDestination> rc = new 
HashSet<ActiveMQDestination>(queues.size() + topics.size());
          for (Iterator<ActiveMQDestination> iter = queues.keySet().iterator(); 
iter.hasNext();) {
@@ -64,6 +66,7 @@ public class MemoryPersistenceAdapter implements 
PersistenceAdapter {
          return new MemoryPersistenceAdapter();
      }
+ @Override
      public MessageStore createQueueMessageStore(ActiveMQQueue destination) 
throws IOException {
          MessageStore rc = queues.get(destination);
          if (rc == null) {
@@ -76,6 +79,7 @@ public class MemoryPersistenceAdapter implements 
PersistenceAdapter {
          return rc;
      }
+ @Override
      public TopicMessageStore createTopicMessageStore(ActiveMQTopic 
destination) throws IOException {
          TopicMessageStore rc = topics.get(destination);
          if (rc == null) {
@@ -93,6 +97,7 @@ public class MemoryPersistenceAdapter implements 
PersistenceAdapter {
       *
       * @param destination Destination to forget
       */
+    @Override
      public void removeQueueMessageStore(ActiveMQQueue destination) {
          queues.remove(destination);
      }
@@ -102,10 +107,12 @@ public class MemoryPersistenceAdapter implements 
PersistenceAdapter {
       *
       * @param destination Destination to forget
       */
+    @Override
      public void removeTopicMessageStore(ActiveMQTopic destination) {
          topics.remove(destination);
      }
+ @Override
      public TransactionStore createTransactionStore() throws IOException {
          if (transactionStore == null) {
              transactionStore = new MemoryTransactionStore(this);
@@ -113,25 +120,32 @@ public class MemoryPersistenceAdapter implements 
PersistenceAdapter {
          return transactionStore;
      }
+ @Override
      public void beginTransaction(ConnectionContext context) {
      }
+ @Override
      public void commitTransaction(ConnectionContext context) {
      }
+ @Override
      public void rollbackTransaction(ConnectionContext context) {
      }
+ @Override
      public void start() throws Exception {
      }
+ @Override
      public void stop() throws Exception {
      }
+ @Override
      public long getLastMessageBrokerSequenceId() throws IOException {
          return 0;
      }
+ @Override
      public void deleteAllMessages() throws IOException {
          for (Iterator<TopicMessageStore> iter = topics.values().iterator(); 
iter.hasNext();) {
              MemoryMessageStore store = asMemoryMessageStore(iter.next());
@@ -177,38 +191,52 @@ public class MemoryPersistenceAdapter implements 
PersistenceAdapter {
       * @param usageManager The UsageManager that is controlling the broker's
       *                memory usage.
       */
+    @Override
      public void setUsageManager(SystemUsage usageManager) {
      }
+ @Override
      public String toString() {
          return "MemoryPersistenceAdapter";
      }
+ @Override
      public void setBrokerName(String brokerName) {
      }
+ @Override
      public void setDirectory(File dir) {
      }
-
+
+    @Override
      public File getDirectory(){
          return null;
      }
+ @Override
      public void checkpoint(boolean sync) throws IOException {
      }
-
+
+    @Override
      public long size(){
          return 0;
      }
-
+
      public void setCreateTransactionStore(boolean create) throws IOException {
          if (create) {
              createTransactionStore();
          }
      }
+ @Override
      public long getLastProducerSequenceId(ProducerId id) {
          // memory map does duplicate suppression
          return -1;
      }
+
+    @Override
+    public JobSchedulerStore createJobSchedulerStore() throws IOException, 
UnsupportedOperationException {
+        // We could eventuall implement an in memory scheduler.
+        throw new UnsupportedOperationException();
+    }
  }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java 
b/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java
index a623de9..2a70194 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java
@@ -61,8 +61,9 @@ public final class IOHelper {
      }
/**
-     * Converts any string into a string that is safe to use as a file name.
-     * The result will only include ascii characters and numbers, and the "-","_", and 
"." characters.
+     * Converts any string into a string that is safe to use as a file name. 
The
+     * result will only include ascii characters and numbers, and the "-","_",
+     * and "." characters.
       *
       * @param name
       * @return
@@ -76,15 +77,16 @@ public final class IOHelper {
      }
/**
-     * Converts any string into a string that is safe to use as a file name.
-     * The result will only include ascii characters and numbers, and the "-","_", and 
"." characters.
+     * Converts any string into a string that is safe to use as a file name. 
The
+     * result will only include ascii characters and numbers, and the "-","_",
+     * and "." characters.
       *
       * @param name
       * @param dirSeparators
       * @param maxFileLength
       * @return
       */
-    public static String toFileSystemSafeName(String name,boolean 
dirSeparators,int maxFileLength) {
+    public static String toFileSystemSafeName(String name, boolean 
dirSeparators, int maxFileLength) {
          int size = name.length();
          StringBuffer rc = new StringBuffer(size * 2);
          for (int i = 0; i < size; i++) {
@@ -92,8 +94,7 @@ public final class IOHelper {
              boolean valid = c >= 'a' && c <= 'z';
              valid = valid || (c >= 'A' && c <= 'Z');
              valid = valid || (c >= '0' && c <= '9');
-            valid = valid || (c == '_') || (c == '-') || (c == '.') || (c=='#')
-                    ||(dirSeparators && ( (c == '/') || (c == '\\')));
+            valid = valid || (c == '_') || (c == '-') || (c == '.') || (c == '#') || 
(dirSeparators && ((c == '/') || (c == '\\')));
if (valid) {
                  rc.append(c);
@@ -105,7 +106,7 @@ public final class IOHelper {
          }
          String result = rc.toString();
          if (result.length() > maxFileLength) {
-            result = 
result.substring(result.length()-maxFileLength,result.length());
+            result = result.substring(result.length() - maxFileLength, 
result.length());
          }
          return result;
      }
@@ -168,8 +169,7 @@ public final class IOHelper {
              } else {
                  for (int i = 0; i < files.length; i++) {
                      File file = files[i];
-                    if (file.getName().equals(".")
-                            || file.getName().equals("..")) {
+                    if (file.getName().equals(".") || 
file.getName().equals("..")) {
                          continue;
                      }
                      if (file.isDirectory()) {
@@ -190,6 +190,27 @@ public final class IOHelper {
          }
      }
+ public static void moveFiles(File srcDirectory, File targetDirectory, FilenameFilter filter) throws IOException {
+        if (!srcDirectory.isDirectory()) {
+            throw new IOException("source is not a directory");
+        }
+
+        if (targetDirectory.exists() && !targetDirectory.isDirectory()) {
+            throw new IOException("target exists and is not a directory");
+        } else {
+            mkdirs(targetDirectory);
+        }
+
+        List<File> filesToMove = new ArrayList<File>();
+        getFiles(srcDirectory, filesToMove, filter);
+
+        for (File file : filesToMove) {
+            if (!file.isDirectory()) {
+                moveFile(file, targetDirectory);
+            }
+        }
+    }
+
      public static void copyFile(File src, File dest) throws IOException {
          copyFile(src, dest, null);
      }
@@ -222,32 +243,32 @@ public final class IOHelper {
          File parent = src.getParentFile();
          String fromPath = from.getAbsolutePath();
          if (parent.getAbsolutePath().equals(fromPath)) {
-            //one level down
+            // one level down
              result = to;
-        }else {
+        } else {
              String parentPath = parent.getAbsolutePath();
              String path = parentPath.substring(fromPath.length());
-            result = new File(to.getAbsolutePath()+File.separator+path);
+            result = new File(to.getAbsolutePath() + File.separator + path);
          }
          return result;
      }
- static List<File> getFiles(File dir,FilenameFilter filter){
+    static List<File> getFiles(File dir, FilenameFilter filter) {
          List<File> result = new ArrayList<File>();
-        getFiles(dir,result,filter);
+        getFiles(dir, result, filter);
          return result;
      }
- static void getFiles(File dir,List<File> list,FilenameFilter filter) {
+    static void getFiles(File dir, List<File> list, FilenameFilter filter) {
          if (!list.contains(dir)) {
              list.add(dir);
-            String[] fileNames=dir.list(filter);
-            for (int i =0; i < fileNames.length;i++) {
-                File f = new File(dir,fileNames[i]);
+            String[] fileNames = dir.list(filter);
+            for (int i = 0; i < fileNames.length; i++) {
+                File f = new File(dir, fileNames[i]);
                  if (f.isFile()) {
                      list.add(f);
-                }else {
-                    getFiles(dir,list,filter);
+                } else {
+                    getFiles(dir, list, filter);
                  }
              }
          }
@@ -286,12 +307,13 @@ public final class IOHelper {
      public static void mkdirs(File dir) throws IOException {
          if (dir.exists()) {
              if (!dir.isDirectory()) {
-                throw new IOException("Failed to create directory '" + dir +"', 
regular file already existed with that name");
+                throw new IOException("Failed to create directory '" + dir +
+                                      "', regular file already existed with that 
name");
              }
} else {
              if (!dir.mkdirs()) {
-                throw new IOException("Failed to create directory '" + 
dir+"'");
+                throw new IOException("Failed to create directory '" + dir + 
"'");
              }
          }
      }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
----------------------------------------------------------------------
diff --git 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
index 7ff4ae0..a3a8250 100755
--- 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
+++ 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
@@ -34,6 +34,7 @@ import org.apache.activemq.ActiveMQMessageAudit;
  import org.apache.activemq.broker.BrokerService;
  import org.apache.activemq.broker.ConnectionContext;
  import org.apache.activemq.broker.Locker;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
  import org.apache.activemq.command.ActiveMQDestination;
  import org.apache.activemq.command.ActiveMQQueue;
  import org.apache.activemq.command.ActiveMQTopic;
@@ -422,6 +423,7 @@ public class JDBCPersistenceAdapter extends 
DataSourceServiceSupport implements
          this.lockDataSource = dataSource;
      }
+ @Override
      public BrokerService getBrokerService() {
          return brokerService;
      }
@@ -846,4 +848,9 @@ public class JDBCPersistenceAdapter extends 
DataSourceServiceSupport implements
          }
          return result;
      }
+
+    @Override
+    public JobSchedulerStore createJobSchedulerStore() throws IOException, 
UnsupportedOperationException {
+        throw new UnsupportedOperationException();
+    }
  }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
----------------------------------------------------------------------
diff --git 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
index 565fc9f..cc5282f 100755
--- 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
+++ 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ThreadFactory;
  import java.util.concurrent.ThreadPoolExecutor;
  import java.util.concurrent.TimeUnit;
  import java.util.concurrent.atomic.AtomicBoolean;
+
  import org.apache.activeio.journal.InvalidRecordLocationException;
  import org.apache.activeio.journal.Journal;
  import org.apache.activeio.journal.JournalEventListener;
@@ -40,6 +41,7 @@ import org.apache.activeio.packet.Packet;
  import org.apache.activemq.broker.BrokerService;
  import org.apache.activemq.broker.BrokerServiceAware;
  import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
  import org.apache.activemq.command.ActiveMQDestination;
  import org.apache.activemq.command.ActiveMQQueue;
  import org.apache.activemq.command.ActiveMQTopic;
@@ -78,14 +80,14 @@ import org.slf4j.LoggerFactory;
   * An implementation of {@link PersistenceAdapter} designed for use with a
   * {@link Journal} and then check pointing asynchronously on a timeout with 
some
   * other long term persistent storage.
- *
+ *
   * @org.apache.xbean.XBean
- *
+ *
   */
  public class JournalPersistenceAdapter implements PersistenceAdapter, 
JournalEventListener, UsageListener, BrokerServiceAware {
private BrokerService brokerService;
-       
+
      protected Scheduler scheduler;
      private static final Logger LOG = 
LoggerFactory.getLogger(JournalPersistenceAdapter.class);
@@ -118,9 +120,9 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
      private TaskRunnerFactory taskRunnerFactory;
      private File directory;
- public JournalPersistenceAdapter() {
+    public JournalPersistenceAdapter() {
      }
-
+
      public JournalPersistenceAdapter(Journal journal, PersistenceAdapter 
longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
          setJournal(journal);
          setTaskRunnerFactory(taskRunnerFactory);
@@ -135,13 +137,14 @@ public class JournalPersistenceAdapter implements 
PersistenceAdapter, JournalEve
          this.journal = journal;
          journal.setJournalEventListener(this);
      }
-
+
      public void setPersistenceAdapter(PersistenceAdapter longTermPersistence) 
{
          this.longTermPersistence = longTermPersistence;
      }
-
+
      final Runnable createPeriodicCheckpointTask() {
          return new Runnable() {
+            @Override
              public void run() {
                  long lastTime = 0;
                  synchronized (this) {
@@ -158,11 +161,13 @@ public class JournalPersistenceAdapter implements 
PersistenceAdapter, JournalEve
       * @param usageManager The UsageManager that is controlling the
       *                destination's memory usage.
       */
+    @Override
      public void setUsageManager(SystemUsage usageManager) {
          this.usageManager = usageManager;
          longTermPersistence.setUsageManager(usageManager);
      }
+ @Override
      public Set<ActiveMQDestination> getDestinations() {
          Set<ActiveMQDestination> destinations = new 
HashSet<ActiveMQDestination>(longTermPersistence.getDestinations());
          destinations.addAll(queues.keySet());
@@ -178,6 +183,7 @@ public class JournalPersistenceAdapter implements 
PersistenceAdapter, JournalEve
          }
      }
+ @Override
      public MessageStore createQueueMessageStore(ActiveMQQueue destination) 
throws IOException {
          JournalMessageStore store = queues.get(destination);
          if (store == null) {
@@ -188,6 +194,7 @@ public class JournalPersistenceAdapter implements 
PersistenceAdapter, JournalEve
          return store;
      }
+ @Override
      public TopicMessageStore createTopicMessageStore(ActiveMQTopic 
destinationName) throws IOException {
          JournalTopicMessageStore store = topics.get(destinationName);
          if (store == null) {
@@ -203,6 +210,7 @@ public class JournalPersistenceAdapter implements 
PersistenceAdapter, JournalEve
       *
       * @param destination Destination to forget
       */
+    @Override
      public void removeQueueMessageStore(ActiveMQQueue destination) {
          queues.remove(destination);
      }
@@ -212,30 +220,37 @@ public class JournalPersistenceAdapter implements 
PersistenceAdapter, JournalEve
       *
       * @param destination Destination to forget
       */
+    @Override
      public void removeTopicMessageStore(ActiveMQTopic destination) {
          topics.remove(destination);
      }
+ @Override
      public TransactionStore createTransactionStore() throws IOException {
          return transactionStore;
      }
+ @Override
      public long getLastMessageBrokerSequenceId() throws IOException {
          return longTermPersistence.getLastMessageBrokerSequenceId();
      }
+ @Override
      public void beginTransaction(ConnectionContext context) throws 
IOException {
          longTermPersistence.beginTransaction(context);
      }
+ @Override
      public void commitTransaction(ConnectionContext context) throws 
IOException {
          longTermPersistence.commitTransaction(context);
      }
+ @Override
      public void rollbackTransaction(ConnectionContext context) throws 
IOException {
          longTermPersistence.rollbackTransaction(context);
      }
+ @Override
      public synchronized void start() throws Exception {
          if (!started.compareAndSet(false, true)) {
              return;
@@ -246,12 +261,14 @@ public class JournalPersistenceAdapter implements 
PersistenceAdapter, JournalEve
          }
checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
+            @Override
              public boolean iterate() {
                  return doCheckpoint();
              }
          }, "ActiveMQ Journal Checkpoint Worker");
checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+            @Override
              public Thread newThread(Runnable runable) {
                  Thread t = new Thread(runable, "Journal checkpoint worker");
                  t.setPriority(7);
@@ -279,6 +296,7 @@ public class JournalPersistenceAdapter implements 
PersistenceAdapter, JournalEve
} + @Override
      public void stop() throws Exception {
this.usageManager.getMemoryUsage().removeUsageListener(this);
@@ -330,16 +348,17 @@ public class JournalPersistenceAdapter implements 
PersistenceAdapter, JournalEve
      /**
       * The Journal give us a call back so that we can move old data out of the
       * journal. Taking a checkpoint does this for us.
-     *
+     *
       * @see 
org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation)
       */
+    @Override
      public void overflowNotification(RecordLocation safeLocation) {
          checkpoint(false, true);
      }
/**
       * When we checkpoint we move all the journalled data to long term 
storage.
-     *
+     *
       */
      public void checkpoint(boolean sync, boolean fullCheckpoint) {
          try {
@@ -369,13 +388,14 @@ public class JournalPersistenceAdapter implements 
PersistenceAdapter, JournalEve
          }
      }
+ @Override
      public void checkpoint(boolean sync) {
          checkpoint(sync, sync);
      }
/**
       * This does the actual checkpoint.
-     *
+     *
       * @return
       */
      public boolean doCheckpoint() {
@@ -398,7 +418,7 @@ public class JournalPersistenceAdapter implements 
PersistenceAdapter, JournalEve
              // We do many partial checkpoints (fullCheckpoint==false) to move
              // topic messages
              // to long term store as soon as possible.
-            //
+            //
              // We want to avoid doing that for queue messages since removes 
the
              // come in the same
              // checkpoint cycle will nullify the previous message add.
@@ -411,6 +431,7 @@ public class JournalPersistenceAdapter implements 
PersistenceAdapter, JournalEve
                      try {
                          final JournalMessageStore ms = iterator.next();
                          FutureTask<RecordLocation> task = new 
FutureTask<RecordLocation>(new Callable<RecordLocation>() {
+                            @Override
                              public RecordLocation call() throws Exception {
                                  return ms.checkpoint();
                              }
@@ -428,6 +449,7 @@ public class JournalPersistenceAdapter implements 
PersistenceAdapter, JournalEve
                  try {
                      final JournalTopicMessageStore ms = iterator.next();
                      FutureTask<RecordLocation> task = new 
FutureTask<RecordLocation>(new Callable<RecordLocation>() {
+                        @Override
                          public RecordLocation call() throws Exception {
                              return ms.checkpoint();
                          }
@@ -505,7 +527,7 @@ public class JournalPersistenceAdapter implements 
PersistenceAdapter, JournalEve
      /**
       * Move all the messages that were in the journal into long term storage. 
We
       * just replay and do a checkpoint.
-     *
+     *
       * @throws IOException
       * @throws IOException
       * @throws InvalidRecordLocationException
@@ -644,11 +666,11 @@ public class JournalPersistenceAdapter implements 
PersistenceAdapter, JournalEve
      public RecordLocation writeCommand(DataStructure command, boolean sync) 
throws IOException {
          if (started.get()) {
              try {
-                   return journal.write(toPacket(wireFormat.marshal(command)), 
sync);
+                return journal.write(toPacket(wireFormat.marshal(command)), 
sync);
              } catch (IOException ioe) {
-                   LOG.error("Cannot write to the journal", ioe);
-                   brokerService.handleIOException(ioe);
-                   throw ioe;
+                LOG.error("Cannot write to the journal", ioe);
+                brokerService.handleIOException(ioe);
+                throw ioe;
              }
          }
          throw new IOException("closed");
@@ -660,6 +682,7 @@ public class JournalPersistenceAdapter implements 
PersistenceAdapter, JournalEve
          return writeCommand(trace, sync);
      }
+ @Override
      public void onUsageChanged(Usage usage, int oldPercentUsage, int 
newPercentUsage) {
          newPercentUsage = (newPercentUsage / 10) * 10;
          oldPercentUsage = (oldPercentUsage / 10) * 10;
@@ -673,6 +696,7 @@ public class JournalPersistenceAdapter implements 
PersistenceAdapter, JournalEve
          return transactionStore;
      }
+ @Override
      public void deleteAllMessages() throws IOException {
          try {
              JournalTrace trace = new JournalTrace();
@@ -735,6 +759,7 @@ public class JournalPersistenceAdapter implements 
PersistenceAdapter, JournalEve
          return new ByteSequence(sequence.getData(), sequence.getOffset(), 
sequence.getLength());
      }
+ @Override
      public void setBrokerName(String brokerName) {
          longTermPersistence.setBrokerName(brokerName);
      }
@@ -744,18 +769,22 @@ public class JournalPersistenceAdapter implements 
PersistenceAdapter, JournalEve
          return "JournalPersistenceAdapter(" + longTermPersistence + ")";
      }
+ @Override
      public void setDirectory(File dir) {
          this.directory=dir;
      }
-
+
+    @Override
      public File getDirectory(){
          return directory;
      }
-
+
+    @Override
      public long size(){
          return 0;
      }
+ @Override
      public void setBrokerService(BrokerService brokerService) {
          this.brokerService = brokerService;
          PersistenceAdapter pa = getLongTermPersistence();
@@ -764,8 +793,14 @@ public class JournalPersistenceAdapter implements 
PersistenceAdapter, JournalEve
          }
      }
+ @Override
      public long getLastProducerSequenceId(ProducerId id) {
          return -1;
      }
+ @Override
+    public JobSchedulerStore createJobSchedulerStore() throws IOException, 
UnsupportedOperationException {
+        return longTermPersistence.createJobSchedulerStore();
+    }
+
  }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java
new file mode 100644
index 0000000..edb2750
--- /dev/null
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import org.apache.activemq.store.kahadb.disk.journal.Location;
+import org.apache.activemq.store.kahadb.disk.page.Page;
+
+public abstract class AbstractKahaDBMetaData<T> implements KahaDBMetaData<T> {
+
+    private int state;
+    private Location lastUpdateLocation;
+    private Page<T> page;
+
+    @Override
+    public Page<T> getPage() {
+        return page;
+    }
+
+    @Override
+    public int getState() {
+        return state;
+    }
+
+    @Override
+    public Location getLastUpdateLocation() {
+        return lastUpdateLocation;
+    }
+
+    @Override
+    public void setPage(Page<T> page) {
+        this.page = page;
+    }
+
+    @Override
+    public void setState(int value) {
+        this.state = value;
+    }
+
+    @Override
+    public void setLastUpdateLocation(Location location) {
+        this.lastUpdateLocation = location;
+    }
+}

.



--
Tim Bish
Sr Software Engineer | RedHat Inc.
[email protected] | www.redhat.com
skype: tabish121 | twitter: @tabish121
blog: http://timbish.blogspot.com/

Reply via email to