http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java index 1a08931..5934914 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java @@ -19,10 +19,8 @@ package org.apache.activemq.store.kahadb.scheduler; import java.io.DataInput; import java.io.DataOutput; import java.io.File; -import java.io.FilenameFilter; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -30,917 +28,363 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.TreeSet; -import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.activemq.broker.LockableServiceSupport; +import org.apache.activemq.broker.Locker; import org.apache.activemq.broker.scheduler.JobScheduler; import org.apache.activemq.broker.scheduler.JobSchedulerStore; -import org.apache.activemq.protobuf.Buffer; -import org.apache.activemq.store.kahadb.AbstractKahaDBStore; -import org.apache.activemq.store.kahadb.JournalCommand; -import org.apache.activemq.store.kahadb.KahaDBMetaData; -import org.apache.activemq.store.kahadb.Visitor; -import org.apache.activemq.store.kahadb.data.KahaAddScheduledJobCommand; -import org.apache.activemq.store.kahadb.data.KahaDestroySchedulerCommand; -import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobCommand; -import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobsCommand; -import org.apache.activemq.store.kahadb.data.KahaRescheduleJobCommand; -import org.apache.activemq.store.kahadb.data.KahaTraceCommand; -import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor; -import org.apache.activemq.store.kahadb.disk.journal.DataFile; +import org.apache.activemq.store.SharedFileLocker; +import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; +import org.apache.activemq.store.kahadb.disk.journal.Journal; import org.apache.activemq.store.kahadb.disk.journal.Location; import org.apache.activemq.store.kahadb.disk.page.Page; import org.apache.activemq.store.kahadb.disk.page.PageFile; import org.apache.activemq.store.kahadb.disk.page.Transaction; +import org.apache.activemq.store.kahadb.disk.util.IntegerMarshaller; +import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; -import org.apache.activemq.store.kahadb.scheduler.legacy.LegacyStoreReplayer; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.IOHelper; +import org.apache.activemq.util.ServiceStopper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSchedulerStore { - - private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreImpl.class); - - private JobSchedulerKahaDBMetaData metaData = new JobSchedulerKahaDBMetaData(this); - private final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this); - private final Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>(); - private File legacyStoreArchiveDirectory; - - /** - * The Scheduler Token is used to identify base revisions of the Scheduler store. A store - * based on the initial scheduler design will not have this tag in it's meta-data and will - * indicate an update is needed. Later versions of the scheduler can also change this value - * to indicate incompatible store bases which require complete meta-data and journal rewrites - * instead of simpler meta-data updates. - */ - static final UUID SCHEDULER_STORE_TOKEN = UUID.fromString("57ed642b-1ee3-47b3-be6d-b7297d500409"); - - /** - * The default scheduler store version. All new store instance will be given this version and - * earlier versions will be updated to this version. - */ - static final int CURRENT_VERSION = 1; - - @Override - public JobScheduler getJobScheduler(final String name) throws Exception { - this.indexLock.writeLock().lock(); - try { - JobSchedulerImpl result = this.schedulers.get(name); - if (result == null) { - final JobSchedulerImpl js = new JobSchedulerImpl(this); - js.setName(name); - getPageFile().tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - js.createIndexes(tx); - js.load(tx); - metaData.getJobSchedulers().put(tx, name, js); - } - }); - result = js; - this.schedulers.put(name, js); - if (isStarted()) { - result.start(); - } - this.pageFile.flush(); - } - return result; - } finally { - this.indexLock.writeLock().unlock(); +public class JobSchedulerStoreImpl extends LockableServiceSupport implements JobSchedulerStore { + static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreImpl.class); + private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000; + + public static final int CLOSED_STATE = 1; + public static final int OPEN_STATE = 2; + + private File directory; + PageFile pageFile; + private Journal journal; + protected AtomicLong journalSize = new AtomicLong(0); + private boolean failIfDatabaseIsLocked; + private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; + private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; + private boolean enableIndexWriteAsync = false; + MetaData metaData = new MetaData(this); + final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this); + Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>(); + + protected class MetaData { + protected MetaData(JobSchedulerStoreImpl store) { + this.store = store; } - } - @Override - public boolean removeJobScheduler(final String name) throws Exception { - boolean result = false; + private final JobSchedulerStoreImpl store; + Page<MetaData> page; + BTreeIndex<Integer, Integer> journalRC; + BTreeIndex<String, JobSchedulerImpl> storedSchedulers; - this.indexLock.writeLock().lock(); - try { - final JobSchedulerImpl js = this.schedulers.remove(name); - result = js != null; - if (result) { - js.stop(); - getPageFile().tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - metaData.getJobSchedulers().remove(tx, name); - js.removeAll(tx); - } - }); - } - } finally { - this.indexLock.writeLock().unlock(); + void createIndexes(Transaction tx) throws IOException { + this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, tx.allocate().getPageId()); + this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId()); } - return result; - } - - /** - * Sets the directory where the legacy scheduler store files are archived before an - * update attempt is made. Both the legacy index files and the journal files are moved - * to this folder prior to an upgrade attempt. - * - * @param directory - * The directory to move the legacy Scheduler Store files to. - */ - public void setLegacyStoreArchiveDirectory(File directory) { - this.legacyStoreArchiveDirectory = directory; - } - /** - * Gets the directory where the legacy Scheduler Store files will be archived if the - * broker is started and an existing Job Scheduler Store from an old version is detected. - * - * @return the directory where scheduler store legacy files are archived on upgrade. - */ - public File getLegacyStoreArchiveDirectory() { - if (this.legacyStoreArchiveDirectory == null) { - this.legacyStoreArchiveDirectory = new File(getDirectory(), "legacySchedulerStore"); + void load(Transaction tx) throws IOException { + this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE); + this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store)); + this.storedSchedulers.load(tx); + this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE); + this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE); + this.journalRC.load(tx); } - return this.legacyStoreArchiveDirectory.getAbsoluteFile(); - } - - @Override - public void load() throws IOException { - if (opened.compareAndSet(false, true)) { - getJournal().start(); - try { - loadPageFile(); - } catch (UnknownStoreVersionException ex) { - LOG.info("Can't start until store update is performed."); - upgradeFromLegacy(); - // Restart with the updated store - getJournal().start(); - loadPageFile(); - LOG.info("Update from legacy Scheduler store completed successfully."); - } catch (Throwable t) { - LOG.warn("Index corrupted. Recovering the index through journal replay. Cause: {}", t.toString()); - LOG.debug("Index load failure", t); - - // try to recover index - try { - pageFile.unload(); - } catch (Exception ignore) { - } - if (isArchiveCorruptedIndex()) { - pageFile.archive(); - } else { - pageFile.delete(); - } - metaData = new JobSchedulerKahaDBMetaData(this); - pageFile = null; - loadPageFile(); + void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException { + for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) { + Entry<String, JobSchedulerImpl> entry = i.next(); + entry.getValue().load(tx); + schedulers.put(entry.getKey(), entry.getValue()); } - startCheckpoint(); - recover(); } - LOG.info("{} started.", this); - } - - @Override - public void unload() throws IOException { - if (opened.compareAndSet(true, false)) { - for (JobSchedulerImpl js : this.schedulers.values()) { - try { - js.stop(); - } catch (Exception e) { - throw new IOException(e); - } - } - this.indexLock.writeLock().lock(); - try { - if (pageFile != null && pageFile.isLoaded()) { - metaData.setState(KahaDBMetaData.CLOSED_STATE); - - if (metaData.getPage() != null) { - pageFile.tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - tx.store(metaData.getPage(), metaDataMarshaller, true); - } - }); - } - } - } finally { - this.indexLock.writeLock().unlock(); - } - checkpointLock.writeLock().lock(); - try { - if (metaData.getPage() != null) { - checkpointUpdate(true); - } - } finally { - checkpointLock.writeLock().unlock(); - } - synchronized (checkpointThreadLock) { - if (checkpointThread != null) { - try { - checkpointThread.join(); - checkpointThread = null; - } catch (InterruptedException e) { - } - } - } - - if (pageFile != null) { - pageFile.unload(); - pageFile = null; - } - if (this.journal != null) { - journal.close(); - journal = null; - } - - metaData = new JobSchedulerKahaDBMetaData(this); + public void read(DataInput is) throws IOException { + this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, is.readLong()); + this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE); + this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store)); + this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong()); + this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE); + this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE); } - LOG.info("{} stopped.", this); - } - private void loadPageFile() throws IOException { - this.indexLock.writeLock().lock(); - try { - final PageFile pageFile = getPageFile(); - pageFile.load(); - pageFile.tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - if (pageFile.getPageCount() == 0) { - Page<JobSchedulerKahaDBMetaData> page = tx.allocate(); - assert page.getPageId() == 0; - page.set(metaData); - metaData.setPage(page); - metaData.setState(KahaDBMetaData.CLOSED_STATE); - metaData.initialize(tx); - tx.store(metaData.getPage(), metaDataMarshaller, true); - } else { - Page<JobSchedulerKahaDBMetaData> page = null; - page = tx.load(0, metaDataMarshaller); - metaData = page.get(); - metaData.setPage(page); - } - metaData.load(tx); - metaData.loadScheduler(tx, schedulers); - for (JobSchedulerImpl js : schedulers.values()) { - try { - js.start(); - } catch (Exception e) { - JobSchedulerStoreImpl.LOG.error("Failed to load " + js.getName(), e); - } - } - } - }); - - pageFile.flush(); - } finally { - this.indexLock.writeLock().unlock(); + public void write(DataOutput os) throws IOException { + os.writeLong(this.storedSchedulers.getPageId()); + os.writeLong(this.journalRC.getPageId()); } } - private void upgradeFromLegacy() throws IOException { - - journal.close(); - journal = null; - try { - pageFile.unload(); - pageFile = null; - } catch (Exception ignore) {} - - File storeDir = getDirectory().getAbsoluteFile(); - File storeArchiveDir = getLegacyStoreArchiveDirectory(); - - LOG.info("Attempting to move old store files from {} to {}", storeDir, storeArchiveDir); - - // Move only the known store files, locks and other items left in place. - IOHelper.moveFiles(storeDir, storeArchiveDir, new FilenameFilter() { - - @Override - public boolean accept(File dir, String name) { - if (name.endsWith(".data") || name.endsWith(".redo") || name.endsWith(".log")) { - return true; - } - return false; - } - }); - - // We reset everything to clean state, then we can read from the old - // scheduler store and replay the scheduled jobs into this one as adds. - getJournal().start(); - metaData = new JobSchedulerKahaDBMetaData(this); - pageFile = null; - loadPageFile(); - - LegacyStoreReplayer replayer = new LegacyStoreReplayer(getLegacyStoreArchiveDirectory()); - replayer.load(); - replayer.startReplay(this); - - // Cleanup after replay and store what we've done. - pageFile.tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - tx.store(metaData.getPage(), metaDataMarshaller, true); - } - }); - - checkpointUpdate(true); - getJournal().close(); - getPageFile().unload(); - } - - @Override - protected void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException { - LOG.debug("Job Scheduler Store Checkpoint started."); - - // reflect last update exclusive of current checkpoint - Location lastUpdate = metaData.getLastUpdateLocation(); - metaData.setState(KahaDBMetaData.OPEN_STATE); - tx.store(metaData.getPage(), metaDataMarshaller, true); - pageFile.flush(); - - if (cleanup) { - final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet()); - final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet); - - LOG.trace("Last update: {}, full gc candidates set: {}", lastUpdate, gcCandidateSet); - - if (lastUpdate != null) { - gcCandidateSet.remove(lastUpdate.getDataFileId()); - } - - this.metaData.getJournalRC().visit(tx, new BTreeVisitor<Integer, Integer>() { - - @Override - public void visit(List<Integer> keys, List<Integer> values) { - for (Integer key : keys) { - if (gcCandidateSet.remove(key)) { - LOG.trace("Removed referenced file: {} from GC set", key); - } - } - } - - @Override - public boolean isInterestedInKeysBetween(Integer first, Integer second) { - return true; - } - }); - - LOG.trace("gc candidates after reference check: {}", gcCandidateSet); - - // If there are GC candidates then check the remove command location to see - // if any of them can go or if they must stay in order to ensure proper recover. - // - // A log containing any remove commands must be kept until all the logs with the - // add commands for all the removed jobs have been dropped. - if (!gcCandidateSet.isEmpty()) { - Iterator<Entry<Integer, List<Integer>>> removals = metaData.getRemoveLocationTracker().iterator(tx); - List<Integer> orphans = new ArrayList<Integer>(); - while (removals.hasNext()) { - boolean orphanedRemve = true; - Entry<Integer, List<Integer>> entry = removals.next(); - - // If this log is not a GC candidate then there's no need to do a check to rule it out - if (gcCandidateSet.contains(entry.getKey())) { - for (Integer addLocation : entry.getValue()) { - if (completeFileSet.contains(addLocation)) { - orphanedRemve = false; - break; - } - } - - // If it's not orphaned than we can't remove it, otherwise we - // stop tracking it it's log will get deleted on the next check. - if (!orphanedRemve) { - LOG.trace("A remove in log {} has an add still in existance.", entry.getKey()); - gcCandidateSet.remove(entry.getKey()); - } else { - LOG.trace("All removes in log {} are orphaned, file can be GC'd", entry.getKey()); - orphans.add(entry.getKey()); - } - } - } - - // Drop all orphaned removes from the tracker. - for (Integer orphan : orphans) { - metaData.getRemoveLocationTracker().remove(tx, orphan); - } - } + class MetaDataMarshaller extends VariableMarshaller<MetaData> { + private final JobSchedulerStoreImpl store; - LOG.trace("gc candidates after removals check: {}", gcCandidateSet); - if (!gcCandidateSet.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Cleanup removing the data files: " + gcCandidateSet); - } - journal.removeDataFiles(gcCandidateSet); - } + MetaDataMarshaller(JobSchedulerStoreImpl store) { + this.store = store; } - LOG.debug("Job Scheduler Store Checkpoint complete."); - } + @Override + public MetaData readPayload(DataInput dataIn) throws IOException { + MetaData rc = new MetaData(this.store); + rc.read(dataIn); + return rc; + } - /** - * Adds a reference for the journal log file pointed to by the given Location value. - * - * To prevent log files in the journal that still contain valid data that needs to be - * kept in order to allow for recovery the logs must have active references. Each Job - * scheduler should ensure that the logs are accurately referenced. - * - * @param tx - * The TX under which the update is to be performed. - * @param location - * The location value to update the reference count of. - * - * @throws IOException if an error occurs while updating the journal references table. - */ - protected void incrementJournalCount(Transaction tx, Location location) throws IOException { - int logId = location.getDataFileId(); - Integer val = metaData.getJournalRC().get(tx, logId); - int refCount = val != null ? val.intValue() + 1 : 1; - metaData.getJournalRC().put(tx, logId, refCount); + @Override + public void writePayload(MetaData object, DataOutput dataOut) throws IOException { + object.write(dataOut); + } } - /** - * Removes one reference for the Journal log file indicated in the given Location value. - * - * The references are used to track which log files cannot be GC'd. When the reference count - * on a log file reaches zero the file id is removed from the tracker and the log will be - * removed on the next check point update. - * - * @param tx - * The TX under which the update is to be performed. - * @param location - * The location value to update the reference count of. - * - * @throws IOException if an error occurs while updating the journal references table. - */ - protected void decrementJournalCount(Transaction tx, Location location) throws IOException { - int logId = location.getDataFileId(); - Integer refCount = metaData.getJournalRC().get(tx, logId); - if (refCount != null) { - int refCountValue = refCount; - refCountValue--; - if (refCountValue <= 0) { - metaData.getJournalRC().remove(tx, logId); - } else { - metaData.getJournalRC().put(tx, logId, refCountValue); + class ValueMarshaller extends VariableMarshaller<List<JobLocation>> { + @Override + public List<JobLocation> readPayload(DataInput dataIn) throws IOException { + List<JobLocation> result = new ArrayList<JobLocation>(); + int size = dataIn.readInt(); + for (int i = 0; i < size; i++) { + JobLocation jobLocation = new JobLocation(); + jobLocation.readExternal(dataIn); + result.add(jobLocation); } + return result; } - } - /** - * Updates the Job removal tracking index with the location of a remove command and the - * original JobLocation entry. - * - * The JobLocation holds the locations in the logs where the add and update commands for - * a job stored. The log file containing the remove command can only be discarded after - * both the add and latest update log files have also been discarded. - * - * @param tx - * The TX under which the update is to be performed. - * @param location - * The location value to reference a remove command. - * @param removedJob - * The original JobLocation instance that holds the add and update locations - * - * @throws IOException if an error occurs while updating the remove location tracker. - */ - protected void referenceRemovedLocation(Transaction tx, Location location, JobLocation removedJob) throws IOException { - int logId = location.getDataFileId(); - List<Integer> removed = this.metaData.getRemoveLocationTracker().get(tx, logId); - if (removed == null) { - removed = new ArrayList<Integer>(); + @Override + public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException { + dataOut.writeInt(value.size()); + for (JobLocation jobLocation : value) { + jobLocation.writeExternal(dataOut); + } } - removed.add(removedJob.getLocation().getDataFileId()); - this.metaData.getRemoveLocationTracker().put(tx, logId, removed); - } - - /** - * Retrieve the scheduled Job's byte blob from the journal. - * - * @param location - * The location of the KahaAddScheduledJobCommand that originated the Job. - * - * @return a ByteSequence containing the payload of the scheduled Job. - * - * @throws IOException if an error occurs while reading the payload value. - */ - protected ByteSequence getPayload(Location location) throws IOException { - KahaAddScheduledJobCommand job = (KahaAddScheduledJobCommand) this.load(location); - Buffer payload = job.getPayload(); - return new ByteSequence(payload.getData(), payload.getOffset(), payload.getLength()); - } - - public void readLockIndex() { - this.indexLock.readLock().lock(); - } - - public void readUnlockIndex() { - this.indexLock.readLock().unlock(); - } - - public void writeLockIndex() { - this.indexLock.writeLock().lock(); - } - - public void writeUnlockIndex() { - this.indexLock.writeLock().unlock(); - } - - @Override - public String toString() { - return "JobSchedulerStore: " + getDirectory(); - } - - @Override - protected String getPageFileName() { - return "scheduleDB"; - } - - @Override - protected File getDefaultDataDirectory() { - return new File(IOHelper.getDefaultDataDirectory(), "delayedDB"); } - private class MetaDataMarshaller extends VariableMarshaller<JobSchedulerKahaDBMetaData> { - + class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> { private final JobSchedulerStoreImpl store; - MetaDataMarshaller(JobSchedulerStoreImpl store) { + JobSchedulerMarshaller(JobSchedulerStoreImpl store) { this.store = store; } @Override - public JobSchedulerKahaDBMetaData readPayload(DataInput dataIn) throws IOException { - JobSchedulerKahaDBMetaData rc = new JobSchedulerKahaDBMetaData(store); - rc.read(dataIn); - return rc; + public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException { + JobSchedulerImpl result = new JobSchedulerImpl(this.store); + result.read(dataIn); + return result; } @Override - public void writePayload(JobSchedulerKahaDBMetaData object, DataOutput dataOut) throws IOException { - object.write(dataOut); + public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException { + js.write(dataOut); } } - /** - * Called during index recovery to rebuild the index from the last known good location. For - * entries that occur before the last known good position we just ignore then and move on. - * - * @param command - * the command read from the Journal which should be used to update the index. - * @param location - * the location in the index where the command was read. - * @param inDoubtlocation - * the location in the index known to be the last time the index was valid. - * - * @throws IOException if an error occurs while recovering the index. - */ - protected void doRecover(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException { - if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) { - process(data, location); - } + @Override + public File getDirectory() { + return directory; } - /** - * Called during recovery to allow the store to rebuild from scratch. - * - * @param data - * The command to process, which was read from the Journal. - * @param location - * The location of the command in the Journal. - * - * @throws IOException if an error occurs during command processing. - */ @Override - protected void process(JournalCommand<?> data, final Location location) throws IOException { - data.visit(new Visitor() { - @Override - public void visit(final KahaAddScheduledJobCommand command) throws IOException { - final JobSchedulerImpl scheduler; - - indexLock.writeLock().lock(); - try { - try { - scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler()); - } catch (Exception e) { - throw new IOException(e); - } - getPageFile().tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - scheduler.process(tx, command, location); - } - }); - - processLocation(location); - } finally { - indexLock.writeLock().unlock(); - } - } - - @Override - public void visit(final KahaRemoveScheduledJobCommand command) throws IOException { - final JobSchedulerImpl scheduler; - - indexLock.writeLock().lock(); - try { - try { - scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler()); - } catch (Exception e) { - throw new IOException(e); - } - getPageFile().tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - scheduler.process(tx, command, location); - } - }); - - processLocation(location); - } finally { - indexLock.writeLock().unlock(); - } - } - - @Override - public void visit(final KahaRemoveScheduledJobsCommand command) throws IOException { - final JobSchedulerImpl scheduler; - - indexLock.writeLock().lock(); - try { - try { - scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler()); - } catch (Exception e) { - throw new IOException(e); - } - getPageFile().tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - scheduler.process(tx, command, location); - } - }); - - processLocation(location); - } finally { - indexLock.writeLock().unlock(); - } - } - - @Override - public void visit(final KahaRescheduleJobCommand command) throws IOException { - final JobSchedulerImpl scheduler; - - indexLock.writeLock().lock(); - try { - try { - scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler()); - } catch (Exception e) { - throw new IOException(e); - } - getPageFile().tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - scheduler.process(tx, command, location); - } - }); - - processLocation(location); - } finally { - indexLock.writeLock().unlock(); - } - } - - @Override - public void visit(final KahaDestroySchedulerCommand command) { - try { - removeJobScheduler(command.getScheduler()); - } catch (Exception e) { - LOG.warn("Failed to remove scheduler: {}", command.getScheduler()); - } - - processLocation(location); - } - - @Override - public void visit(KahaTraceCommand command) { - processLocation(location); - } - }); + public void setDirectory(File directory) { + this.directory = directory; } - protected void processLocation(final Location location) { - indexLock.writeLock().lock(); + @Override + public long size() { + if (!isStarted()) { + return 0; + } try { - this.metaData.setLastUpdateLocation(location); - } finally { - indexLock.writeLock().unlock(); + return journalSize.get() + pageFile.getDiskSize(); + } catch (IOException e) { + throw new RuntimeException(e); } } - /** - * We recover from the Journal logs as needed to restore the index. - * - * @throws IllegalStateException - * @throws IOException - */ - private void recover() throws IllegalStateException, IOException { - this.indexLock.writeLock().lock(); - try { - long start = System.currentTimeMillis(); - Location lastIndoubtPosition = getRecoveryPosition(); - Location recoveryPosition = lastIndoubtPosition; - - if (recoveryPosition != null) { - int redoCounter = 0; - LOG.info("Recovering from the journal ..."); - while (recoveryPosition != null) { - JournalCommand<?> message = load(recoveryPosition); - metaData.setLastUpdateLocation(recoveryPosition); - doRecover(message, recoveryPosition, lastIndoubtPosition); - redoCounter++; - recoveryPosition = journal.getNextLocation(recoveryPosition); - if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) { - LOG.info("@ {}, {} entries recovered ..", recoveryPosition, redoCounter); - } + @Override + public JobScheduler getJobScheduler(final String name) throws Exception { + JobSchedulerImpl result = this.schedulers.get(name); + if (result == null) { + final JobSchedulerImpl js = new JobSchedulerImpl(this); + js.setName(name); + getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + js.createIndexes(tx); + js.load(tx); + metaData.storedSchedulers.put(tx, name, js); } - long end = System.currentTimeMillis(); - LOG.info("Recovery replayed {} operations from the journal in {} seconds.", - redoCounter, ((end - start) / 1000.0f)); + }); + result = js; + this.schedulers.put(name, js); + if (isStarted()) { + result.start(); } + this.pageFile.flush(); + } + return result; + } - // We may have to undo some index updates. - pageFile.tx().execute(new Transaction.Closure<IOException>() { + @Override + synchronized public boolean removeJobScheduler(final String name) throws Exception { + boolean result = false; + final JobSchedulerImpl js = this.schedulers.remove(name); + result = js != null; + if (result) { + js.stop(); + getPageFile().tx().execute(new Transaction.Closure<IOException>() { @Override public void execute(Transaction tx) throws IOException { - recoverIndex(tx); + metaData.storedSchedulers.remove(tx, name); + js.destroy(tx); } }); - - } finally { - this.indexLock.writeLock().unlock(); } + return result; } - private Location getRecoveryPosition() throws IOException { - // This loads the first position and we completely rebuild the index if we - // do not override it with some known recovery start location. - Location result = null; - - if (!isForceRecoverIndex()) { - if (metaData.getLastUpdateLocation() != null) { - result = metaData.getLastUpdateLocation(); - } + @Override + protected synchronized void doStart() throws Exception { + if (this.directory == null) { + this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB"); } + IOHelper.mkdirs(this.directory); + this.journal = new Journal(); + this.journal.setDirectory(directory); + this.journal.setMaxFileLength(getJournalMaxFileLength()); + this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize()); + this.journal.setSizeAccumulator(this.journalSize); + this.journal.start(); + this.pageFile = new PageFile(directory, "scheduleDB"); + this.pageFile.setWriteBatchSize(1); + this.pageFile.load(); + + this.pageFile.tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + if (pageFile.getPageCount() == 0) { + Page<MetaData> page = tx.allocate(); + assert page.getPageId() == 0; + page.set(metaData); + metaData.page = page; + metaData.createIndexes(tx); + tx.store(metaData.page, metaDataMarshaller, true); - return journal.getNextLocation(result); - } - - private void recoverIndex(Transaction tx) throws IOException { - long start = System.currentTimeMillis(); - - // It is possible index updates got applied before the journal updates.. - // in that case we need to removed references to Jobs that are not in the journal - final Location lastAppendLocation = journal.getLastAppendLocation(); - long undoCounter = 0; - - // Go through all the jobs in each scheduler and check if any are added after - // the last appended location and remove those. For now we ignore the update - // location since the scheduled job will update itself after the next fire and - // a new update will replace any existing update. - for (Iterator<Map.Entry<String, JobSchedulerImpl>> i = metaData.getJobSchedulers().iterator(tx); i.hasNext();) { - Map.Entry<String, JobSchedulerImpl> entry = i.next(); - JobSchedulerImpl scheduler = entry.getValue(); - - List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx); - for (JobLocation job : jobs) { - if (job.getLocation().compareTo(lastAppendLocation) >= 0) { - if (scheduler.removeJobAtTime(tx, job.getJobId(), job.getNextTime())) { - LOG.trace("Removed Job past last appened in the journal: {}", job.getJobId()); - undoCounter++; + } else { + Page<MetaData> page = tx.load(0, metaDataMarshaller); + metaData = page.get(); + metaData.page = page; + } + metaData.load(tx); + metaData.loadScheduler(tx, schedulers); + for (JobSchedulerImpl js : schedulers.values()) { + try { + js.start(); + } catch (Exception e) { + JobSchedulerStoreImpl.LOG.error("Failed to load " + js.getName(), e); } } } - } - - if (undoCounter > 0) { - // The rolled back operations are basically in flight journal writes. To avoid getting - // these the end user should do sync writes to the journal. - long end = System.currentTimeMillis(); - LOG.info("Rolled back {} messages from the index in {} seconds.", undoCounter, ((end - start) / 1000.0f)); - undoCounter = 0; - } - - // Now we check for missing and corrupt journal files. + }); - // 1. Collect the set of all referenced journal files based on the Location of the - // the scheduled jobs and the marked last update field. - HashSet<Integer> missingJournalFiles = new HashSet<Integer>(); - for (Iterator<Map.Entry<String, JobSchedulerImpl>> i = metaData.getJobSchedulers().iterator(tx); i.hasNext();) { - Map.Entry<String, JobSchedulerImpl> entry = i.next(); - JobSchedulerImpl scheduler = entry.getValue(); + this.pageFile.flush(); + LOG.info(this + " started"); + } - List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx); - for (JobLocation job : jobs) { - missingJournalFiles.add(job.getLocation().getDataFileId()); - if (job.getLastUpdate() != null) { - missingJournalFiles.add(job.getLastUpdate().getDataFileId()); - } - } + @Override + protected synchronized void doStop(ServiceStopper stopper) throws Exception { + for (JobSchedulerImpl js : this.schedulers.values()) { + js.stop(); } - - // 2. Remove from that set all known data file Id's in the journal and what's left - // is the missing set which will soon also contain the corrupted set. - missingJournalFiles.removeAll(journal.getFileMap().keySet()); - if (!missingJournalFiles.isEmpty()) { - LOG.info("Some journal files are missing: {}", missingJournalFiles); + if (this.pageFile != null) { + this.pageFile.unload(); } + if (this.journal != null) { + journal.close(); + } + LOG.info(this + " stopped"); + } - // 3. Now check all references in the journal logs for corruption and add any - // corrupt journal files to the missing set. - HashSet<Location> corruptedLocations = new HashSet<Location>(); - - if (isCheckForCorruptJournalFiles()) { - Collection<DataFile> dataFiles = journal.getFileMap().values(); - for (DataFile dataFile : dataFiles) { - int id = dataFile.getDataFileId(); - for (long offset : dataFile.getCorruptedBlocks()) { - corruptedLocations.add(new Location(id, (int) offset)); - } - } + synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException { + int logId = location.getDataFileId(); + Integer val = this.metaData.journalRC.get(tx, logId); + int refCount = val != null ? val.intValue() + 1 : 1; + this.metaData.journalRC.put(tx, logId, refCount); + } - if (!corruptedLocations.isEmpty()) { - LOG.debug("Found some corrupted data blocks in the journal: {}", corruptedLocations.size()); - } + synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException { + int logId = location.getDataFileId(); + int refCount = this.metaData.journalRC.get(tx, logId); + refCount--; + if (refCount <= 0) { + this.metaData.journalRC.remove(tx, logId); + Set<Integer> set = new HashSet<Integer>(); + set.add(logId); + this.journal.removeDataFiles(set); + } else { + this.metaData.journalRC.put(tx, logId, refCount); } + } - // 4. Now we either fail or we remove all references to missing or corrupt journal - // files from the various JobSchedulerImpl instances. We only remove the Job if - // the initial Add operation is missing when the ignore option is set, the updates - // could be lost but that's price you pay when ignoring the missing logs. - if (!missingJournalFiles.isEmpty() || !corruptedLocations.isEmpty()) { - if (!isIgnoreMissingJournalfiles()) { - throw new IOException("Detected missing/corrupt journal files."); - } + synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException { + ByteSequence result = null; + result = this.journal.read(location); + return result; + } - // Remove all Jobs that reference an Location that is either missing or corrupt. - undoCounter = removeJobsInMissingOrCorruptJounralFiles(tx, missingJournalFiles, corruptedLocations); + synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException { + return this.journal.write(payload, sync); + } - // Clean up the Journal Reference count Map. - removeJournalRCForMissingFiles(tx, missingJournalFiles); - } + PageFile getPageFile() { + this.pageFile.isLoaded(); + return this.pageFile; + } - if (undoCounter > 0) { - long end = System.currentTimeMillis(); - LOG.info("Detected missing/corrupt journal files. Dropped {} jobs from the " + - "index in {} seconds.", undoCounter, ((end - start) / 1000.0f)); - } + public boolean isFailIfDatabaseIsLocked() { + return failIfDatabaseIsLocked; } - private void removeJournalRCForMissingFiles(Transaction tx, Set<Integer> missing) throws IOException { - List<Integer> matches = new ArrayList<Integer>(); + public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { + this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; + } - Iterator<Entry<Integer, Integer>> references = metaData.getJournalRC().iterator(tx); - while (references.hasNext()) { - int dataFileId = references.next().getKey(); - if (missing.contains(dataFileId)) { - matches.add(dataFileId); - } - } + public int getJournalMaxFileLength() { + return journalMaxFileLength; + } - for (Integer match : matches) { - metaData.getJournalRC().remove(tx, match); - } + public void setJournalMaxFileLength(int journalMaxFileLength) { + this.journalMaxFileLength = journalMaxFileLength; } - private int removeJobsInMissingOrCorruptJounralFiles(Transaction tx, Set<Integer> missing, Set<Location> corrupted) throws IOException { - int removed = 0; + public int getJournalMaxWriteBatchSize() { + return journalMaxWriteBatchSize; + } - // Remove Jobs that reference missing or corrupt files. - // Remove Reference counts to missing or corrupt files. - // Remove and remove command markers to missing or corrupt files. - for (Iterator<Map.Entry<String, JobSchedulerImpl>> i = metaData.getJobSchedulers().iterator(tx); i.hasNext();) { - Map.Entry<String, JobSchedulerImpl> entry = i.next(); - JobSchedulerImpl scheduler = entry.getValue(); + public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { + this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; + } - List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx); - for (JobLocation job : jobs) { + public boolean isEnableIndexWriteAsync() { + return enableIndexWriteAsync; + } - // Remove all jobs in missing log files. - if (missing.contains(job.getLocation().getDataFileId())) { - scheduler.removeJobAtTime(tx, job.getJobId(), job.getNextTime()); - removed++; - continue; - } + public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { + this.enableIndexWriteAsync = enableIndexWriteAsync; + } - // Remove all jobs in corrupted parts of log files. - if (corrupted.contains(job.getLocation())) { - scheduler.removeJobAtTime(tx, job.getJobId(), job.getNextTime()); - removed++; - } - } - } + @Override + public String toString() { + return "JobSchedulerStore:" + this.directory; + } - return removed; + @Override + public Locker createDefaultLocker() throws IOException { + SharedFileLocker locker = new SharedFileLocker(); + locker.setDirectory(this.getDirectory()); + return locker; + } + + @Override + public void init() throws Exception { } }
http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/UnknownStoreVersionException.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/UnknownStoreVersionException.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/UnknownStoreVersionException.java deleted file mode 100644 index 5146d84..0000000 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/UnknownStoreVersionException.java +++ /dev/null @@ -1,24 +0,0 @@ -package org.apache.activemq.store.kahadb.scheduler; - -import java.io.IOException; - -public class UnknownStoreVersionException extends IOException { - - private static final long serialVersionUID = -8544753506151157145L; - - private final String token; - - public UnknownStoreVersionException(Throwable cause) { - super(cause); - this.token = ""; - } - - public UnknownStoreVersionException(String token) { - super("Failed to load Store, found unknown store token: " + token); - this.token = token; - } - - public String getToken() { - return this.token; - } -} http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobImpl.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobImpl.java deleted file mode 100644 index 2562f50..0000000 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobImpl.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadb.scheduler.legacy; - -import org.apache.activemq.protobuf.Buffer; -import org.apache.activemq.util.ByteSequence; - -/** - * Legacy version Job and Job payload wrapper. Allows for easy replay of stored - * legacy jobs into a new JobSchedulerStoreImpl intsance. - */ -final class LegacyJobImpl { - - private final LegacyJobLocation jobLocation; - private final Buffer payload; - - protected LegacyJobImpl(LegacyJobLocation location, ByteSequence payload) { - this.jobLocation = location; - this.payload = new Buffer(payload.data, payload.offset, payload.length); - } - - public String getJobId() { - return this.jobLocation.getJobId(); - } - - public Buffer getPayload() { - return this.payload; - } - - public long getPeriod() { - return this.jobLocation.getPeriod(); - } - - public int getRepeat() { - return this.jobLocation.getRepeat(); - } - - public long getDelay() { - return this.jobLocation.getDelay(); - } - - public String getCronEntry() { - return this.jobLocation.getCronEntry(); - } - - public long getNextExecutionTime() { - return this.jobLocation.getNextTime(); - } - - public long getStartTime() { - return this.jobLocation.getStartTime(); - } - - @Override - public String toString() { - return this.jobLocation.toString(); - } -} http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobLocation.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobLocation.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobLocation.java deleted file mode 100644 index 8437064..0000000 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobLocation.java +++ /dev/null @@ -1,296 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadb.scheduler.legacy; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; - -import org.apache.activemq.store.kahadb.disk.journal.Location; -import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; - -final class LegacyJobLocation { - - private String jobId; - private int repeat; - private long startTime; - private long delay; - private long nextTime; - private long period; - private String cronEntry; - private final Location location; - - public LegacyJobLocation(Location location) { - this.location = location; - } - - public LegacyJobLocation() { - this(new Location()); - } - - public void readExternal(DataInput in) throws IOException { - this.jobId = in.readUTF(); - this.repeat = in.readInt(); - this.startTime = in.readLong(); - this.delay = in.readLong(); - this.nextTime = in.readLong(); - this.period = in.readLong(); - this.cronEntry = in.readUTF(); - this.location.readExternal(in); - } - - public void writeExternal(DataOutput out) throws IOException { - out.writeUTF(this.jobId); - out.writeInt(this.repeat); - out.writeLong(this.startTime); - out.writeLong(this.delay); - out.writeLong(this.nextTime); - out.writeLong(this.period); - if (this.cronEntry == null) { - this.cronEntry = ""; - } - out.writeUTF(this.cronEntry); - this.location.writeExternal(out); - } - - /** - * @return the jobId - */ - public String getJobId() { - return this.jobId; - } - - /** - * @param jobId - * the jobId to set - */ - public void setJobId(String jobId) { - this.jobId = jobId; - } - - /** - * @return the repeat - */ - public int getRepeat() { - return this.repeat; - } - - /** - * @param repeat - * the repeat to set - */ - public void setRepeat(int repeat) { - this.repeat = repeat; - } - - /** - * @return the start - */ - public long getStartTime() { - return this.startTime; - } - - /** - * @param start - * the start to set - */ - public void setStartTime(long start) { - this.startTime = start; - } - - /** - * @return the nextTime - */ - public synchronized long getNextTime() { - return this.nextTime; - } - - /** - * @param nextTime - * the nextTime to set - */ - public synchronized void setNextTime(long nextTime) { - this.nextTime = nextTime; - } - - /** - * @return the period - */ - public long getPeriod() { - return this.period; - } - - /** - * @param period - * the period to set - */ - public void setPeriod(long period) { - this.period = period; - } - - /** - * @return the cronEntry - */ - public synchronized String getCronEntry() { - return this.cronEntry; - } - - /** - * @param cronEntry - * the cronEntry to set - */ - public synchronized void setCronEntry(String cronEntry) { - this.cronEntry = cronEntry; - } - - /** - * @return if this JobLocation represents a cron entry. - */ - public boolean isCron() { - return getCronEntry() != null && getCronEntry().length() > 0; - } - - /** - * @return the delay - */ - public long getDelay() { - return this.delay; - } - - /** - * @param delay - * the delay to set - */ - public void setDelay(long delay) { - this.delay = delay; - } - - /** - * @return the location - */ - public Location getLocation() { - return this.location; - } - - @Override - public String toString() { - return "Job [id=" + jobId + ", startTime=" + new Date(startTime) + - ", delay=" + delay + ", period=" + period + - ", repeat=" + repeat + ", nextTime=" + new Date(nextTime) + "]"; - } - - static class JobLocationMarshaller extends VariableMarshaller<List<LegacyJobLocation>> { - static final JobLocationMarshaller INSTANCE = new JobLocationMarshaller(); - - @Override - public List<LegacyJobLocation> readPayload(DataInput dataIn) throws IOException { - List<LegacyJobLocation> result = new ArrayList<LegacyJobLocation>(); - int size = dataIn.readInt(); - for (int i = 0; i < size; i++) { - LegacyJobLocation jobLocation = new LegacyJobLocation(); - jobLocation.readExternal(dataIn); - result.add(jobLocation); - } - return result; - } - - @Override - public void writePayload(List<LegacyJobLocation> value, DataOutput dataOut) throws IOException { - dataOut.writeInt(value.size()); - for (LegacyJobLocation jobLocation : value) { - jobLocation.writeExternal(dataOut); - } - } - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((cronEntry == null) ? 0 : cronEntry.hashCode()); - result = prime * result + (int) (delay ^ (delay >>> 32)); - result = prime * result + ((jobId == null) ? 0 : jobId.hashCode()); - result = prime * result + ((location == null) ? 0 : location.hashCode()); - result = prime * result + (int) (nextTime ^ (nextTime >>> 32)); - result = prime * result + (int) (period ^ (period >>> 32)); - result = prime * result + repeat; - result = prime * result + (int) (startTime ^ (startTime >>> 32)); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - - if (obj == null) { - return false; - } - - if (getClass() != obj.getClass()) { - return false; - } - - LegacyJobLocation other = (LegacyJobLocation) obj; - - if (cronEntry == null) { - if (other.cronEntry != null) { - return false; - } - } else if (!cronEntry.equals(other.cronEntry)) { - return false; - } - - if (delay != other.delay) { - return false; - } - - if (jobId == null) { - if (other.jobId != null) - return false; - } else if (!jobId.equals(other.jobId)) { - return false; - } - - if (location == null) { - if (other.location != null) { - return false; - } - } else if (!location.equals(other.location)) { - return false; - } - - if (nextTime != other.nextTime) { - return false; - } - if (period != other.period) { - return false; - } - if (repeat != other.repeat) { - return false; - } - if (startTime != other.startTime) { - return false; - } - - return true; - } -} http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerImpl.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerImpl.java deleted file mode 100644 index 687ffd7..0000000 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerImpl.java +++ /dev/null @@ -1,222 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadb.scheduler.legacy; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; -import org.apache.activemq.store.kahadb.disk.journal.Location; -import org.apache.activemq.store.kahadb.disk.page.Transaction; -import org.apache.activemq.store.kahadb.disk.util.LongMarshaller; -import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; -import org.apache.activemq.util.ByteSequence; -import org.apache.activemq.util.ServiceStopper; -import org.apache.activemq.util.ServiceSupport; - -/** - * Read-only view of a stored legacy JobScheduler instance. - */ -final class LegacyJobSchedulerImpl extends ServiceSupport { - - private final LegacyJobSchedulerStoreImpl store; - private String name; - private BTreeIndex<Long, List<LegacyJobLocation>> index; - - LegacyJobSchedulerImpl(LegacyJobSchedulerStoreImpl store) { - this.store = store; - } - - public void setName(String name) { - this.name = name; - } - - public String getName() { - return this.name; - } - - /** - * Returns the next time that a job would be scheduled to run. - * - * @return time of next scheduled job to run. - * - * @throws IOException if an error occurs while fetching the time. - */ - public long getNextScheduleTime() throws IOException { - Map.Entry<Long, List<LegacyJobLocation>> first = this.index.getFirst(this.store.getPageFile().tx()); - return first != null ? first.getKey() : -1l; - } - - /** - * Gets the list of the next batch of scheduled jobs in the store. - * - * @return a list of the next jobs that will run. - * - * @throws IOException if an error occurs while fetching the jobs list. - */ - public List<LegacyJobImpl> getNextScheduleJobs() throws IOException { - final List<LegacyJobImpl> result = new ArrayList<LegacyJobImpl>(); - - this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - Map.Entry<Long, List<LegacyJobLocation>> first = index.getFirst(store.getPageFile().tx()); - if (first != null) { - for (LegacyJobLocation jl : first.getValue()) { - ByteSequence bs = getPayload(jl.getLocation()); - LegacyJobImpl job = new LegacyJobImpl(jl, bs); - result.add(job); - } - } - } - }); - return result; - } - - /** - * Gets a list of all scheduled jobs in this store. - * - * @return a list of all the currently scheduled jobs in this store. - * - * @throws IOException if an error occurs while fetching the list of jobs. - */ - public List<LegacyJobImpl> getAllJobs() throws IOException { - final List<LegacyJobImpl> result = new ArrayList<LegacyJobImpl>(); - this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - Iterator<Map.Entry<Long, List<LegacyJobLocation>>> iter = index.iterator(store.getPageFile().tx()); - while (iter.hasNext()) { - Map.Entry<Long, List<LegacyJobLocation>> next = iter.next(); - if (next != null) { - for (LegacyJobLocation jl : next.getValue()) { - ByteSequence bs = getPayload(jl.getLocation()); - LegacyJobImpl job = new LegacyJobImpl(jl, bs); - result.add(job); - } - } else { - break; - } - } - } - }); - return result; - } - - /** - * Gets a list of all scheduled jobs that exist between the given start and end time. - * - * @param start - * The start time to look for scheduled jobs. - * @param finish - * The end time to stop looking for scheduled jobs. - * - * @return a list of all scheduled jobs that would run between the given start and end time. - * - * @throws IOException if an error occurs while fetching the list of jobs. - */ - public List<LegacyJobImpl> getAllJobs(final long start, final long finish) throws IOException { - final List<LegacyJobImpl> result = new ArrayList<LegacyJobImpl>(); - this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - Iterator<Map.Entry<Long, List<LegacyJobLocation>>> iter = index.iterator(store.getPageFile().tx(), start); - while (iter.hasNext()) { - Map.Entry<Long, List<LegacyJobLocation>> next = iter.next(); - if (next != null && next.getKey().longValue() <= finish) { - for (LegacyJobLocation jl : next.getValue()) { - ByteSequence bs = getPayload(jl.getLocation()); - LegacyJobImpl job = new LegacyJobImpl(jl, bs); - result.add(job); - } - } else { - break; - } - } - } - }); - return result; - } - - ByteSequence getPayload(Location location) throws IllegalStateException, IOException { - return this.store.getPayload(location); - } - - @Override - public String toString() { - return "LegacyJobScheduler: " + this.name; - } - - @Override - protected void doStart() throws Exception { - } - - @Override - protected void doStop(ServiceStopper stopper) throws Exception { - } - - void createIndexes(Transaction tx) throws IOException { - this.index = new BTreeIndex<Long, List<LegacyJobLocation>>(this.store.getPageFile(), tx.allocate().getPageId()); - } - - void load(Transaction tx) throws IOException { - this.index.setKeyMarshaller(LongMarshaller.INSTANCE); - this.index.setValueMarshaller(ValueMarshaller.INSTANCE); - this.index.load(tx); - } - - void read(DataInput in) throws IOException { - this.name = in.readUTF(); - this.index = new BTreeIndex<Long, List<LegacyJobLocation>>(this.store.getPageFile(), in.readLong()); - this.index.setKeyMarshaller(LongMarshaller.INSTANCE); - this.index.setValueMarshaller(ValueMarshaller.INSTANCE); - } - - public void write(DataOutput out) throws IOException { - out.writeUTF(name); - out.writeLong(this.index.getPageId()); - } - - static class ValueMarshaller extends VariableMarshaller<List<LegacyJobLocation>> { - static ValueMarshaller INSTANCE = new ValueMarshaller(); - - @Override - public List<LegacyJobLocation> readPayload(DataInput dataIn) throws IOException { - List<LegacyJobLocation> result = new ArrayList<LegacyJobLocation>(); - int size = dataIn.readInt(); - for (int i = 0; i < size; i++) { - LegacyJobLocation jobLocation = new LegacyJobLocation(); - jobLocation.readExternal(dataIn); - result.add(jobLocation); - } - return result; - } - - @Override - public void writePayload(List<LegacyJobLocation> value, DataOutput dataOut) throws IOException { - dataOut.writeInt(value.size()); - for (LegacyJobLocation jobLocation : value) { - jobLocation.writeExternal(dataOut); - } - } - } -} http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerStoreImpl.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerStoreImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerStoreImpl.java deleted file mode 100644 index acbd4e7..0000000 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerStoreImpl.java +++ /dev/null @@ -1,378 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadb.scheduler.legacy; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.NoSuchElementException; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; -import org.apache.activemq.store.kahadb.disk.journal.Journal; -import org.apache.activemq.store.kahadb.disk.journal.Location; -import org.apache.activemq.store.kahadb.disk.page.Page; -import org.apache.activemq.store.kahadb.disk.page.PageFile; -import org.apache.activemq.store.kahadb.disk.page.Transaction; -import org.apache.activemq.store.kahadb.disk.util.IntegerMarshaller; -import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; -import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; -import org.apache.activemq.util.ByteSequence; -import org.apache.activemq.util.IOHelper; -import org.apache.activemq.util.LockFile; -import org.apache.activemq.util.ServiceStopper; -import org.apache.activemq.util.ServiceSupport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Read-only view of a legacy JobSchedulerStore implementation. - */ -final class LegacyJobSchedulerStoreImpl extends ServiceSupport { - - static final Logger LOG = LoggerFactory.getLogger(LegacyJobSchedulerStoreImpl.class); - - private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000; - - private File directory; - private PageFile pageFile; - private Journal journal; - private LockFile lockFile; - private final AtomicLong journalSize = new AtomicLong(0); - private boolean failIfDatabaseIsLocked; - private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; - private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; - private boolean enableIndexWriteAsync = false; - private MetaData metaData = new MetaData(this); - private final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this); - private final Map<String, LegacyJobSchedulerImpl> schedulers = new HashMap<String, LegacyJobSchedulerImpl>(); - - protected class MetaData { - protected MetaData(LegacyJobSchedulerStoreImpl store) { - this.store = store; - } - - private final LegacyJobSchedulerStoreImpl store; - Page<MetaData> page; - BTreeIndex<Integer, Integer> journalRC; - BTreeIndex<String, LegacyJobSchedulerImpl> storedSchedulers; - - void createIndexes(Transaction tx) throws IOException { - this.storedSchedulers = new BTreeIndex<String, LegacyJobSchedulerImpl>(pageFile, tx.allocate().getPageId()); - this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId()); - } - - void load(Transaction tx) throws IOException { - this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE); - this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store)); - this.storedSchedulers.load(tx); - this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE); - this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE); - this.journalRC.load(tx); - } - - void loadScheduler(Transaction tx, Map<String, LegacyJobSchedulerImpl> schedulers) throws IOException { - for (Iterator<Entry<String, LegacyJobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) { - Entry<String, LegacyJobSchedulerImpl> entry = i.next(); - entry.getValue().load(tx); - schedulers.put(entry.getKey(), entry.getValue()); - } - } - - public void read(DataInput is) throws IOException { - this.storedSchedulers = new BTreeIndex<String, LegacyJobSchedulerImpl>(pageFile, is.readLong()); - this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE); - this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store)); - this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong()); - this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE); - this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE); - } - - public void write(DataOutput os) throws IOException { - os.writeLong(this.storedSchedulers.getPageId()); - os.writeLong(this.journalRC.getPageId()); - } - } - - class MetaDataMarshaller extends VariableMarshaller<MetaData> { - private final LegacyJobSchedulerStoreImpl store; - - MetaDataMarshaller(LegacyJobSchedulerStoreImpl store) { - this.store = store; - } - - @Override - public MetaData readPayload(DataInput dataIn) throws IOException { - MetaData rc = new MetaData(this.store); - rc.read(dataIn); - return rc; - } - - @Override - public void writePayload(MetaData object, DataOutput dataOut) throws IOException { - object.write(dataOut); - } - } - - class ValueMarshaller extends VariableMarshaller<List<LegacyJobLocation>> { - @Override - public List<LegacyJobLocation> readPayload(DataInput dataIn) throws IOException { - List<LegacyJobLocation> result = new ArrayList<LegacyJobLocation>(); - int size = dataIn.readInt(); - for (int i = 0; i < size; i++) { - LegacyJobLocation jobLocation = new LegacyJobLocation(); - jobLocation.readExternal(dataIn); - result.add(jobLocation); - } - return result; - } - - @Override - public void writePayload(List<LegacyJobLocation> value, DataOutput dataOut) throws IOException { - dataOut.writeInt(value.size()); - for (LegacyJobLocation jobLocation : value) { - jobLocation.writeExternal(dataOut); - } - } - } - - class JobSchedulerMarshaller extends VariableMarshaller<LegacyJobSchedulerImpl> { - private final LegacyJobSchedulerStoreImpl store; - - JobSchedulerMarshaller(LegacyJobSchedulerStoreImpl store) { - this.store = store; - } - - @Override - public LegacyJobSchedulerImpl readPayload(DataInput dataIn) throws IOException { - LegacyJobSchedulerImpl result = new LegacyJobSchedulerImpl(this.store); - result.read(dataIn); - return result; - } - - @Override - public void writePayload(LegacyJobSchedulerImpl js, DataOutput dataOut) throws IOException { - js.write(dataOut); - } - } - - public File getDirectory() { - return directory; - } - - public void setDirectory(File directory) { - this.directory = directory; - } - - public long size() { - if (!isStarted()) { - return 0; - } - try { - return journalSize.get() + pageFile.getDiskSize(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - /** - * Returns the named Job Scheduler if it exists, otherwise throws an exception. - * - * @param name - * The name of the scheduler that is to be returned. - * - * @return the named scheduler if it exists. - * - * @throws Exception if the named scheduler does not exist in this store. - */ - public LegacyJobSchedulerImpl getJobScheduler(final String name) throws Exception { - LegacyJobSchedulerImpl result = this.schedulers.get(name); - if (result == null) { - throw new NoSuchElementException("No such Job Scheduler in this store: " + name); - } - return result; - } - - /** - * Returns the names of all the schedulers that exist in this scheduler store. - * - * @return a set of names of all scheduler instances in this store. - * - * @throws Exception if an error occurs while collecting the scheduler names. - */ - public Set<String> getJobSchedulerNames() throws Exception { - Set<String> names = Collections.emptySet(); - - if (!schedulers.isEmpty()) { - return this.schedulers.keySet(); - } - - return names; - } - - @Override - protected void doStart() throws Exception { - if (this.directory == null) { - this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB"); - } - IOHelper.mkdirs(this.directory); - lock(); - this.journal = new Journal(); - this.journal.setDirectory(directory); - this.journal.setMaxFileLength(getJournalMaxFileLength()); - this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize()); - this.journal.setSizeAccumulator(this.journalSize); - this.journal.start(); - this.pageFile = new PageFile(directory, "scheduleDB"); - this.pageFile.setWriteBatchSize(1); - this.pageFile.load(); - - this.pageFile.tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - if (pageFile.getPageCount() == 0) { - Page<MetaData> page = tx.allocate(); - assert page.getPageId() == 0; - page.set(metaData); - metaData.page = page; - metaData.createIndexes(tx); - tx.store(metaData.page, metaDataMarshaller, true); - - } else { - Page<MetaData> page = tx.load(0, metaDataMarshaller); - metaData = page.get(); - metaData.page = page; - } - metaData.load(tx); - metaData.loadScheduler(tx, schedulers); - for (LegacyJobSchedulerImpl js : schedulers.values()) { - try { - js.start(); - } catch (Exception e) { - LegacyJobSchedulerStoreImpl.LOG.error("Failed to load " + js.getName(), e); - } - } - } - }); - - this.pageFile.flush(); - LOG.info(this + " started"); - } - - @Override - protected void doStop(ServiceStopper stopper) throws Exception { - for (LegacyJobSchedulerImpl js : this.schedulers.values()) { - js.stop(); - } - if (this.pageFile != null) { - this.pageFile.unload(); - } - if (this.journal != null) { - journal.close(); - } - if (this.lockFile != null) { - this.lockFile.unlock(); - } - this.lockFile = null; - LOG.info(this + " stopped"); - } - - ByteSequence getPayload(Location location) throws IllegalStateException, IOException { - ByteSequence result = null; - result = this.journal.read(location); - return result; - } - - Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException { - return this.journal.write(payload, sync); - } - - private void lock() throws IOException { - if (lockFile == null) { - File lockFileName = new File(directory, "lock"); - lockFile = new LockFile(lockFileName, true); - if (failIfDatabaseIsLocked) { - lockFile.lock(); - } else { - while (true) { - try { - lockFile.lock(); - break; - } catch (IOException e) { - LOG.info("Database " + lockFileName + " is locked... waiting " + (DATABASE_LOCKED_WAIT_DELAY / 1000) - + " seconds for the database to be unlocked. Reason: " + e); - try { - Thread.sleep(DATABASE_LOCKED_WAIT_DELAY); - } catch (InterruptedException e1) { - } - } - } - } - } - } - - PageFile getPageFile() { - this.pageFile.isLoaded(); - return this.pageFile; - } - - public boolean isFailIfDatabaseIsLocked() { - return failIfDatabaseIsLocked; - } - - public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { - this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; - } - - public int getJournalMaxFileLength() { - return journalMaxFileLength; - } - - public void setJournalMaxFileLength(int journalMaxFileLength) { - this.journalMaxFileLength = journalMaxFileLength; - } - - public int getJournalMaxWriteBatchSize() { - return journalMaxWriteBatchSize; - } - - public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { - this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; - } - - public boolean isEnableIndexWriteAsync() { - return enableIndexWriteAsync; - } - - public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { - this.enableIndexWriteAsync = enableIndexWriteAsync; - } - - @Override - public String toString() { - return "LegacyJobSchedulerStore:" + this.directory; - } -}