http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java new file mode 100644 index 0000000..6003c87 --- /dev/null +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java @@ -0,0 +1,745 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.store.kahadb; + +import java.io.File; +import java.io.IOException; +import java.util.Date; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.activemq.broker.LockableServiceSupport; +import org.apache.activemq.broker.Locker; +import org.apache.activemq.store.SharedFileLocker; +import org.apache.activemq.store.kahadb.data.KahaEntryType; +import org.apache.activemq.store.kahadb.data.KahaTraceCommand; +import org.apache.activemq.store.kahadb.disk.journal.Journal; +import org.apache.activemq.store.kahadb.disk.journal.Location; +import org.apache.activemq.store.kahadb.disk.page.PageFile; +import org.apache.activemq.store.kahadb.disk.page.Transaction; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.DataByteArrayInputStream; +import org.apache.activemq.util.DataByteArrayOutputStream; +import org.apache.activemq.util.IOHelper; +import org.apache.activemq.util.ServiceStopper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractKahaDBStore extends LockableServiceSupport { + + static final Logger LOG = LoggerFactory.getLogger(AbstractKahaDBStore.class); + + public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME"; + public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0); + + protected File directory; + protected PageFile pageFile; + protected Journal journal; + protected AtomicLong journalSize = new AtomicLong(0); + protected boolean failIfDatabaseIsLocked; + protected long checkpointInterval = 5*1000; + protected long cleanupInterval = 30*1000; + protected boolean checkForCorruptJournalFiles = false; + protected boolean checksumJournalFiles = true; + protected boolean forceRecoverIndex = false; + protected int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; + protected int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; + protected boolean archiveCorruptedIndex = false; + protected boolean enableIndexWriteAsync = false; + protected boolean enableJournalDiskSyncs = false; + protected boolean deleteAllJobs = false; + protected int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; + protected boolean useIndexLFRUEviction = false; + protected float indexLFUEvictionFactor = 0.2f; + protected boolean ignoreMissingJournalfiles = false; + protected int indexCacheSize = 1000; + protected boolean enableIndexDiskSyncs = true; + protected boolean enableIndexRecoveryFile = true; + protected boolean enableIndexPageCaching = true; + protected boolean archiveDataLogs; + protected boolean purgeStoreOnStartup; + protected File directoryArchive; + + protected AtomicBoolean opened = new AtomicBoolean(); + protected Thread checkpointThread; + protected final Object checkpointThreadLock = new Object(); + protected ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock(); + protected ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock(); + + /** + * @return the name to give this store's PageFile instance. + */ + protected abstract String getPageFileName(); + + /** + * @return the location of the data directory if no set by configuration. + */ + protected abstract File getDefaultDataDirectory(); + + /** + * Loads the store from disk. + * + * Based on configuration this method can either load an existing store or it can purge + * an existing store and start in a clean state. + * + * @throws IOException if an error occurs during the load. + */ + public abstract void load() throws IOException; + + /** + * Unload the state of the Store to disk and shuts down all resources assigned to this + * KahaDB store implementation. + * + * @throws IOException if an error occurs during the store unload. + */ + public abstract void unload() throws IOException; + + @Override + protected void doStart() throws Exception { + this.indexLock.writeLock().lock(); + if (getDirectory() == null) { + setDirectory(getDefaultDataDirectory()); + } + IOHelper.mkdirs(getDirectory()); + try { + if (isPurgeStoreOnStartup()) { + getJournal().start(); + getJournal().delete(); + getJournal().close(); + journal = null; + getPageFile().delete(); + LOG.info("{} Persistence store purged.", this); + setPurgeStoreOnStartup(false); + } + + load(); + store(new KahaTraceCommand().setMessage("LOADED " + new Date())); + } finally { + this.indexLock.writeLock().unlock(); + } + } + + @Override + protected void doStop(ServiceStopper stopper) throws Exception { + unload(); + } + + public PageFile getPageFile() { + if (pageFile == null) { + pageFile = createPageFile(); + } + return pageFile; + } + + public Journal getJournal() throws IOException { + if (journal == null) { + journal = createJournal(); + } + return journal; + } + + public File getDirectory() { + return directory; + } + + public void setDirectory(File directory) { + this.directory = directory; + } + + public boolean isArchiveCorruptedIndex() { + return archiveCorruptedIndex; + } + + public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) { + this.archiveCorruptedIndex = archiveCorruptedIndex; + } + + public boolean isFailIfDatabaseIsLocked() { + return failIfDatabaseIsLocked; + } + + public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { + this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; + } + + public boolean isCheckForCorruptJournalFiles() { + return checkForCorruptJournalFiles; + } + + public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { + this.checkForCorruptJournalFiles = checkForCorruptJournalFiles; + } + + public long getCheckpointInterval() { + return checkpointInterval; + } + + public void setCheckpointInterval(long checkpointInterval) { + this.checkpointInterval = checkpointInterval; + } + + public long getCleanupInterval() { + return cleanupInterval; + } + + public void setCleanupInterval(long cleanupInterval) { + this.cleanupInterval = cleanupInterval; + } + + public boolean isChecksumJournalFiles() { + return checksumJournalFiles; + } + + public void setChecksumJournalFiles(boolean checksumJournalFiles) { + this.checksumJournalFiles = checksumJournalFiles; + } + + public boolean isForceRecoverIndex() { + return forceRecoverIndex; + } + + public void setForceRecoverIndex(boolean forceRecoverIndex) { + this.forceRecoverIndex = forceRecoverIndex; + } + + public int getJournalMaxFileLength() { + return journalMaxFileLength; + } + + public void setJournalMaxFileLength(int journalMaxFileLength) { + this.journalMaxFileLength = journalMaxFileLength; + } + + public int getJournalMaxWriteBatchSize() { + return journalMaxWriteBatchSize; + } + + public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { + this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; + } + + public boolean isEnableIndexWriteAsync() { + return enableIndexWriteAsync; + } + + public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { + this.enableIndexWriteAsync = enableIndexWriteAsync; + } + + public boolean isEnableJournalDiskSyncs() { + return enableJournalDiskSyncs; + } + + public void setEnableJournalDiskSyncs(boolean syncWrites) { + this.enableJournalDiskSyncs = syncWrites; + } + + public boolean isDeleteAllJobs() { + return deleteAllJobs; + } + + public void setDeleteAllJobs(boolean deleteAllJobs) { + this.deleteAllJobs = deleteAllJobs; + } + + /** + * @return the archiveDataLogs + */ + public boolean isArchiveDataLogs() { + return this.archiveDataLogs; + } + + /** + * @param archiveDataLogs the archiveDataLogs to set + */ + public void setArchiveDataLogs(boolean archiveDataLogs) { + this.archiveDataLogs = archiveDataLogs; + } + + /** + * @return the directoryArchive + */ + public File getDirectoryArchive() { + return this.directoryArchive; + } + + /** + * @param directoryArchive the directoryArchive to set + */ + public void setDirectoryArchive(File directoryArchive) { + this.directoryArchive = directoryArchive; + } + + public int getIndexCacheSize() { + return indexCacheSize; + } + + public void setIndexCacheSize(int indexCacheSize) { + this.indexCacheSize = indexCacheSize; + } + + public int getIndexWriteBatchSize() { + return indexWriteBatchSize; + } + + public void setIndexWriteBatchSize(int indexWriteBatchSize) { + this.indexWriteBatchSize = indexWriteBatchSize; + } + + public boolean isUseIndexLFRUEviction() { + return useIndexLFRUEviction; + } + + public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) { + this.useIndexLFRUEviction = useIndexLFRUEviction; + } + + public float getIndexLFUEvictionFactor() { + return indexLFUEvictionFactor; + } + + public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) { + this.indexLFUEvictionFactor = indexLFUEvictionFactor; + } + + public boolean isEnableIndexDiskSyncs() { + return enableIndexDiskSyncs; + } + + public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) { + this.enableIndexDiskSyncs = enableIndexDiskSyncs; + } + + public boolean isEnableIndexRecoveryFile() { + return enableIndexRecoveryFile; + } + + public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) { + this.enableIndexRecoveryFile = enableIndexRecoveryFile; + } + + public boolean isEnableIndexPageCaching() { + return enableIndexPageCaching; + } + + public void setEnableIndexPageCaching(boolean enableIndexPageCaching) { + this.enableIndexPageCaching = enableIndexPageCaching; + } + + public boolean isPurgeStoreOnStartup() { + return this.purgeStoreOnStartup; + } + + public void setPurgeStoreOnStartup(boolean purge) { + this.purgeStoreOnStartup = purge; + } + + public boolean isIgnoreMissingJournalfiles() { + return ignoreMissingJournalfiles; + } + + public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) { + this.ignoreMissingJournalfiles = ignoreMissingJournalfiles; + } + + public long size() { + if (!isStarted()) { + return 0; + } + try { + return journalSize.get() + pageFile.getDiskSize(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Locker createDefaultLocker() throws IOException { + SharedFileLocker locker = new SharedFileLocker(); + locker.setDirectory(this.getDirectory()); + return locker; + } + + @Override + public void init() throws Exception { + } + + /** + * Store a command in the Journal and process to update the Store index. + * + * @param command + * The specific JournalCommand to store and process. + * + * @returns the Location where the data was written in the Journal. + * + * @throws IOException if an error occurs storing or processing the command. + */ + public Location store(JournalCommand<?> command) throws IOException { + return store(command, isEnableIndexDiskSyncs(), null, null, null); + } + + /** + * Store a command in the Journal and process to update the Store index. + * + * @param command + * The specific JournalCommand to store and process. + * @param sync + * Should the store operation be done synchronously. (ignored if completion passed). + * + * @returns the Location where the data was written in the Journal. + * + * @throws IOException if an error occurs storing or processing the command. + */ + public Location store(JournalCommand<?> command, boolean sync) throws IOException { + return store(command, sync, null, null, null); + } + + /** + * Store a command in the Journal and process to update the Store index. + * + * @param command + * The specific JournalCommand to store and process. + * @param onJournalStoreComplete + * The Runnable to call when the Journal write operation completes. + * + * @returns the Location where the data was written in the Journal. + * + * @throws IOException if an error occurs storing or processing the command. + */ + public Location store(JournalCommand<?> command, Runnable onJournalStoreComplete) throws IOException { + return store(command, isEnableIndexDiskSyncs(), null, null, onJournalStoreComplete); + } + + /** + * Store a command in the Journal and process to update the Store index. + * + * @param command + * The specific JournalCommand to store and process. + * @param sync + * Should the store operation be done synchronously. (ignored if completion passed). + * @param before + * The Runnable instance to execute before performing the store and process operation. + * @param after + * The Runnable instance to execute after performing the store and process operation. + * + * @returns the Location where the data was written in the Journal. + * + * @throws IOException if an error occurs storing or processing the command. + */ + public Location store(JournalCommand<?> command, boolean sync, Runnable before, Runnable after) throws IOException { + return store(command, sync, before, after, null); + } + + /** + * All updated are are funneled through this method. The updates are converted to a + * JournalMessage which is logged to the journal and then the data from the JournalMessage + * is used to update the index just like it would be done during a recovery process. + * + * @param command + * The specific JournalCommand to store and process. + * @param sync + * Should the store operation be done synchronously. (ignored if completion passed). + * @param before + * The Runnable instance to execute before performing the store and process operation. + * @param after + * The Runnable instance to execute after performing the store and process operation. + * @param onJournalStoreComplete + * Callback to be run when the journal write operation is complete. + * + * @returns the Location where the data was written in the Journal. + * + * @throws IOException if an error occurs storing or processing the command. + */ + public Location store(JournalCommand<?> command, boolean sync, Runnable before, Runnable after, Runnable onJournalStoreComplete) throws IOException { + try { + + if (before != null) { + before.run(); + } + + ByteSequence sequence = toByteSequence(command); + Location location; + checkpointLock.readLock().lock(); + try { + + long start = System.currentTimeMillis(); + location = onJournalStoreComplete == null ? journal.write(sequence, sync) : + journal.write(sequence, onJournalStoreComplete); + long start2 = System.currentTimeMillis(); + + process(command, location); + + long end = System.currentTimeMillis(); + if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { + LOG.info("Slow KahaDB access: Journal append took: {} ms, Index update took {} ms", + (start2-start), (end-start2)); + } + } finally { + checkpointLock.readLock().unlock(); + } + + if (after != null) { + after.run(); + } + + if (checkpointThread != null && !checkpointThread.isAlive()) { + startCheckpoint(); + } + return location; + } catch (IOException ioe) { + LOG.error("KahaDB failed to store to Journal", ioe); + if (brokerService != null) { + brokerService.handleIOException(ioe); + } + throw ioe; + } + } + + /** + * Loads a previously stored JournalMessage + * + * @param location + * The location of the journal command to read. + * + * @return a new un-marshaled JournalCommand instance. + * + * @throws IOException if an error occurs reading the stored command. + */ + protected JournalCommand<?> load(Location location) throws IOException { + ByteSequence data = journal.read(location); + DataByteArrayInputStream is = new DataByteArrayInputStream(data); + byte readByte = is.readByte(); + KahaEntryType type = KahaEntryType.valueOf(readByte); + if (type == null) { + try { + is.close(); + } catch (IOException e) { + } + throw new IOException("Could not load journal record. Invalid location: " + location); + } + JournalCommand<?> message = (JournalCommand<?>)type.createMessage(); + message.mergeFramed(is); + return message; + } + + /** + * Process a stored or recovered JournalCommand instance and update the DB Index with the + * state changes that this command produces. This can be called either as a new DB operation + * or as a replay during recovery operations. + * + * @param command + * The JournalCommand to process. + * @param location + * The location in the Journal where the command was written or read from. + */ + protected abstract void process(JournalCommand<?> command, Location location) throws IOException; + + /** + * Perform a checkpoint operation with optional cleanup. + * + * Called by the checkpoint background thread periodically to initiate a checkpoint operation + * and if the cleanup flag is set a cleanup sweep should be done to allow for release of no + * longer needed journal log files etc. + * + * @param cleanup + * Should the method do a simple checkpoint or also perform a journal cleanup. + * + * @throws IOException if an error occurs during the checkpoint operation. + */ + protected void checkpointUpdate(final boolean cleanup) throws IOException { + checkpointLock.writeLock().lock(); + try { + this.indexLock.writeLock().lock(); + try { + pageFile.tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + checkpointUpdate(tx, cleanup); + } + }); + } finally { + this.indexLock.writeLock().unlock(); + } + + } finally { + checkpointLock.writeLock().unlock(); + } + } + + /** + * Perform the checkpoint update operation. If the cleanup flag is true then the + * operation should also purge any unused Journal log files. + * + * This method must always be called with the checkpoint and index write locks held. + * + * @param tx + * The TX under which to perform the checkpoint update. + * @param cleanup + * Should the checkpoint also do unused Journal file cleanup. + * + * @throws IOException if an error occurs while performing the checkpoint. + */ + protected abstract void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException; + + /** + * Creates a new ByteSequence that represents the marshaled form of the given Journal Command. + * + * @param command + * The Journal Command that should be marshaled to bytes for writing. + * + * @return the byte representation of the given journal command. + * + * @throws IOException if an error occurs while serializing the command. + */ + protected ByteSequence toByteSequence(JournalCommand<?> data) throws IOException { + int size = data.serializedSizeFramed(); + DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); + os.writeByte(data.type().getNumber()); + data.writeFramed(os); + return os.toByteSequence(); + } + + /** + * Create the PageFile instance and configure it using the configuration options + * currently set. + * + * @return the newly created and configured PageFile instance. + */ + protected PageFile createPageFile() { + PageFile index = new PageFile(getDirectory(), getPageFileName()); + index.setEnableWriteThread(isEnableIndexWriteAsync()); + index.setWriteBatchSize(getIndexWriteBatchSize()); + index.setPageCacheSize(getIndexCacheSize()); + index.setUseLFRUEviction(isUseIndexLFRUEviction()); + index.setLFUEvictionFactor(getIndexLFUEvictionFactor()); + index.setEnableDiskSyncs(isEnableIndexDiskSyncs()); + index.setEnableRecoveryFile(isEnableIndexRecoveryFile()); + index.setEnablePageCaching(isEnableIndexPageCaching()); + return index; + } + + /** + * Create a new Journal instance and configure it using the currently set configuration + * options. If an archive directory is configured than this method will attempt to create + * that directory if it does not already exist. + * + * @return the newly created an configured Journal instance. + * + * @throws IOException if an error occurs while creating the Journal object. + */ + protected Journal createJournal() throws IOException { + Journal manager = new Journal(); + manager.setDirectory(getDirectory()); + manager.setMaxFileLength(getJournalMaxFileLength()); + manager.setCheckForCorruptionOnStartup(isCheckForCorruptJournalFiles()); + manager.setChecksum(isChecksumJournalFiles() || isCheckForCorruptJournalFiles()); + manager.setWriteBatchSize(getJournalMaxWriteBatchSize()); + manager.setArchiveDataLogs(isArchiveDataLogs()); + manager.setSizeAccumulator(journalSize); + manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs()); + if (getDirectoryArchive() != null) { + IOHelper.mkdirs(getDirectoryArchive()); + manager.setDirectoryArchive(getDirectoryArchive()); + } + return manager; + } + + /** + * Starts the checkpoint Thread instance if not already running and not disabled + * by configuration. + */ + protected void startCheckpoint() { + if (checkpointInterval == 0 && cleanupInterval == 0) { + LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart"); + return; + } + synchronized (checkpointThreadLock) { + boolean start = false; + if (checkpointThread == null) { + start = true; + } else if (!checkpointThread.isAlive()) { + start = true; + LOG.info("KahaDB: Recovering checkpoint thread after death"); + } + if (start) { + checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") { + @Override + public void run() { + try { + long lastCleanup = System.currentTimeMillis(); + long lastCheckpoint = System.currentTimeMillis(); + // Sleep for a short time so we can periodically check + // to see if we need to exit this thread. + long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500); + while (opened.get()) { + Thread.sleep(sleepTime); + long now = System.currentTimeMillis(); + if( cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval) ) { + checkpointCleanup(true); + lastCleanup = now; + lastCheckpoint = now; + } else if( checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval )) { + checkpointCleanup(false); + lastCheckpoint = now; + } + } + } catch (InterruptedException e) { + // Looks like someone really wants us to exit this thread... + } catch (IOException ioe) { + LOG.error("Checkpoint failed", ioe); + brokerService.handleIOException(ioe); + } + } + }; + + checkpointThread.setDaemon(true); + checkpointThread.start(); + } + } + } + + /** + * Called from the worker thread to start a checkpoint. + * + * This method ensure that the store is in an opened state and optionaly logs information + * related to slow store access times. + * + * @param cleanup + * Should a cleanup of the journal occur during the checkpoint operation. + * + * @throws IOException if an error occurs during the checkpoint operation. + */ + protected void checkpointCleanup(final boolean cleanup) throws IOException { + long start; + this.indexLock.writeLock().lock(); + try { + start = System.currentTimeMillis(); + if (!opened.get()) { + return; + } + } finally { + this.indexLock.writeLock().unlock(); + } + checkpointUpdate(cleanup); + long end = System.currentTimeMillis(); + if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { + LOG.info("Slow KahaDB access: cleanup took {}", (end - start)); + } + } +}
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBMetaData.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBMetaData.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBMetaData.java new file mode 100644 index 0000000..defb238 --- /dev/null +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBMetaData.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.activemq.store.kahadb.disk.journal.Location; +import org.apache.activemq.store.kahadb.disk.page.Page; +import org.apache.activemq.store.kahadb.disk.page.Transaction; + +/** + * Interface for the store meta data used to hold the index value and other needed + * information to manage a KahaDB store instance. + */ +public interface KahaDBMetaData<T> { + + /** + * Indicates that this meta data instance has been opened and is active. + */ + public static final int OPEN_STATE = 2; + + /** + * Indicates that this meta data instance has been closed and is no longer active. + */ + public static final int CLOSED_STATE = 1; + + /** + * Gets the Page in the store PageFile where the KahaDBMetaData instance is stored. + * + * @return the Page to use to start access the KahaDBMetaData instance. + */ + Page<T> getPage(); + + /** + * Sets the Page instance used to load and store the KahaDBMetaData instance. + * + * @param page + * the new Page value to use. + */ + void setPage(Page<T> page); + + /** + * Gets the state flag of this meta data instance. + * + * @return the current state value for this instance. + */ + int getState(); + + /** + * Sets the current value of the state flag. + * + * @param value + * the new value to assign to the state flag. + */ + void setState(int value); + + /** + * Returns the Journal Location value that indicates that last recorded update + * that was successfully performed for this KahaDB store implementation. + * + * @return the location of the last successful update location. + */ + Location getLastUpdateLocation(); + + /** + * Updates the value of the last successful update. + * + * @param location + * the new value to assign the last update location field. + */ + void setLastUpdateLocation(Location location); + + /** + * For a newly created KahaDBMetaData instance this method is called to allow + * the instance to create all of it's internal indices and other state data. + * + * @param tx + * the Transaction instance under which the operation is executed. + * + * @throws IOException if an error occurs while creating the meta data structures. + */ + void initialize(Transaction tx) throws IOException; + + /** + * Instructs this object to load its internal data structures from the KahaDB PageFile + * and prepare itself for use. + * + * @param tx + * the Transaction instance under which the operation is executed. + * + * @throws IOException if an error occurs while creating the meta data structures. + */ + void load(Transaction tx) throws IOException; + + /** + * Reads the serialized for of this object from the KadaDB PageFile and prepares it + * for use. This method does not need to perform a full load of the meta data structures + * only read in the information necessary to load them from the PageFile on a call to the + * load method. + * + * @param in + * the DataInput instance used to read this objects serialized form. + * + * @throws IOException if an error occurs while reading the serialized form. + */ + void read(DataInput in) throws IOException; + + /** + * Writes the object into a serialized form which can be read back in again using the + * read method. + * + * @param out + * the DataOutput instance to use to write the current state to a serialized form. + * + * @throws IOException if an error occurs while serializing this instance. + */ + void write(DataOutput out) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java index d8b986e..8ca8ca4 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java @@ -31,6 +31,7 @@ import org.apache.activemq.broker.LockableServiceSupport; import org.apache.activemq.broker.Locker; import org.apache.activemq.broker.jmx.AnnotatedMBean; import org.apache.activemq.broker.jmx.PersistenceAdapterView; +import org.apache.activemq.broker.scheduler.JobSchedulerStore; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; @@ -39,7 +40,14 @@ import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.XATransactionId; import org.apache.activemq.protobuf.Buffer; -import org.apache.activemq.store.*; +import org.apache.activemq.store.JournaledStore; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.SharedFileLocker; +import org.apache.activemq.store.TopicMessageStore; +import org.apache.activemq.store.TransactionIdTransformer; +import org.apache.activemq.store.TransactionIdTransformerAware; +import org.apache.activemq.store.TransactionStore; import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId; import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; import org.apache.activemq.store.kahadb.data.KahaXATransactionId; @@ -642,4 +650,9 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) { getStore().setTransactionIdTransformer(transactionIdTransformer); } + + @Override + public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { + return this.letter.createJobSchedulerStore(); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 60c0738..975cd05 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.scheduler.JobSchedulerStore; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTempQueue; @@ -55,7 +56,14 @@ import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.protobuf.Buffer; -import org.apache.activemq.store.*; +import org.apache.activemq.store.AbstractMessageStore; +import org.apache.activemq.store.ListenableFuture; +import org.apache.activemq.store.MessageRecoveryListener; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.TopicMessageStore; +import org.apache.activemq.store.TransactionIdTransformer; +import org.apache.activemq.store.TransactionStore; import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; import org.apache.activemq.store.kahadb.data.KahaDestination; import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; @@ -66,6 +74,7 @@ import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand; import org.apache.activemq.store.kahadb.disk.journal.Location; import org.apache.activemq.store.kahadb.disk.page.Transaction; +import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.ServiceStopper; @@ -172,6 +181,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public int getMaxAsyncJobs() { return this.maxAsyncJobs; } + /** * @param maxAsyncJobs * the maxAsyncJobs to set @@ -426,6 +436,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } + @Override public void updateMessage(Message message) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter()); @@ -472,7 +483,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { indexLock.writeLock().lock(); try { location = findMessageLocation(key, dest); - }finally { + } finally { indexLock.writeLock().unlock(); } if (location == null) { @@ -492,19 +503,17 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { @Override public Integer execute(Transaction tx) throws IOException { // Iterate through all index entries to get a count - // of - // messages in the destination. + // of messages in the destination. StoredDestination sd = getStoredDestination(dest, tx); int rc = 0; - for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator - .hasNext();) { + for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) { iterator.next(); rc++; } return rc; } }); - }finally { + } finally { indexLock.writeLock().unlock(); } } finally { @@ -525,7 +534,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { return sd.locationIndex.isEmpty(tx); } }); - }finally { + } finally { indexLock.writeLock().unlock(); } } @@ -552,12 +561,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } } }); - }finally { + } finally { indexLock.writeLock().unlock(); } } - @Override public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { indexLock.writeLock().lock(); @@ -583,7 +591,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { sd.orderIndex.stoppedIterating(); } }); - }finally { + } finally { indexLock.writeLock().unlock(); } } @@ -628,7 +636,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { }); } catch (Exception e) { LOG.error("Failed to reset batching",e); - }finally { + } finally { indexLock.writeLock().unlock(); } } @@ -641,8 +649,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { lockAsyncJobQueue(); // Hopefully one day the page file supports concurrent read - // operations... but for now we must - // externally synchronize... + // operations... but for now we must externally synchronize... indexLock.writeLock().lock(); try { @@ -725,8 +732,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { @Override public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, - MessageId messageId, MessageAck ack) - throws IOException { + MessageId messageId, MessageAck ack) throws IOException { String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString(); if (isConcurrentStoreAndDispatchTopics()) { AsyncJobKey key = new AsyncJobKey(messageId, getDestination()); @@ -810,7 +816,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } } }); - }finally { + } finally { indexLock.writeLock().unlock(); } @@ -836,7 +842,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { .getSubscriptionInfo().newInput())); } }); - }finally { + } finally { indexLock.writeLock().unlock(); } } @@ -859,7 +865,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { return (int) getStoredMessageCount(tx, sd, subscriptionKey); } }); - }finally { + } finally { indexLock.writeLock().unlock(); } } @@ -890,7 +896,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { sd.orderIndex.resetCursorPosition(); } }); - }finally { + } finally { indexLock.writeLock().unlock(); } } @@ -943,7 +949,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } } }); - }finally { + } finally { indexLock.writeLock().unlock(); } } @@ -1358,7 +1364,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { LOG.warn("Failed to aquire lock", e); } } - } @Override @@ -1422,7 +1427,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { if (runnable instanceof StoreTask) { ((StoreTask)runnable).releaseLocks(); } - } } + + @Override + public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { + return new JobSchedulerStoreImpl(); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java index d10c4eb..eca83e8 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java @@ -16,12 +16,44 @@ */ package org.apache.activemq.store.kahadb; -import org.apache.activemq.broker.*; -import org.apache.activemq.command.*; +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.transaction.xa.Xid; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.Lockable; +import org.apache.activemq.broker.LockableServiceSupport; +import org.apache.activemq.broker.Locker; +import org.apache.activemq.broker.scheduler.JobSchedulerStore; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.LocalTransactionId; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.command.TransactionId; +import org.apache.activemq.command.XATransactionId; import org.apache.activemq.filter.AnyDestination; import org.apache.activemq.filter.DestinationMap; import org.apache.activemq.filter.DestinationMapEntry; -import org.apache.activemq.store.*; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.SharedFileLocker; +import org.apache.activemq.store.TopicMessageStore; +import org.apache.activemq.store.TransactionIdTransformer; +import org.apache.activemq.store.TransactionIdTransformerAware; +import org.apache.activemq.store.TransactionStore; +import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOHelper; @@ -30,13 +62,6 @@ import org.apache.activemq.util.ServiceStopper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.transaction.xa.Xid; -import java.io.File; -import java.io.FileFilter; -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.*; - /** * An implementation of {@link org.apache.activemq.store.PersistenceAdapter} that supports * distribution of destinations across multiple kahaDB persistence adapters @@ -50,6 +75,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem final int LOCAL_FORMAT_ID_MAGIC = Integer.valueOf(System.getProperty("org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore.localXaFormatId", "61616")); final class DelegateDestinationMap extends DestinationMap { + @Override public void setEntries(List<DestinationMapEntry> entries) { super.setEntries(entries); } @@ -252,7 +278,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem } if (adapter instanceof PersistenceAdapter) { adapter.removeQueueMessageStore(destination); - removeMessageStore((PersistenceAdapter)adapter, destination); + removeMessageStore(adapter, destination); destinationMap.removeAll(destination); } } @@ -267,7 +293,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem } if (adapter instanceof PersistenceAdapter) { adapter.removeTopicMessageStore(destination); - removeMessageStore((PersistenceAdapter)adapter, destination); + removeMessageStore(adapter, destination); destinationMap.removeAll(destination); } } @@ -453,6 +479,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem } } + @Override public BrokerService getBrokerService() { return brokerService; } @@ -503,4 +530,9 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem locker.configure(this); return locker; } + + @Override + public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { + return new JobSchedulerStoreImpl(); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java index c7ece83..8840a1d 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java @@ -31,16 +31,24 @@ import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.XATransactionId; -import org.apache.activemq.store.*; +import org.apache.activemq.store.AbstractMessageStore; +import org.apache.activemq.store.ListenableFuture; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.ProxyMessageStore; +import org.apache.activemq.store.ProxyTopicMessageStore; +import org.apache.activemq.store.TopicMessageStore; +import org.apache.activemq.store.TransactionRecoveryListener; +import org.apache.activemq.store.TransactionStore; import org.apache.activemq.store.kahadb.data.KahaCommitCommand; import org.apache.activemq.store.kahadb.data.KahaEntryType; import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; import org.apache.activemq.store.kahadb.data.KahaTraceCommand; -import org.apache.activemq.util.IOHelper; import org.apache.activemq.store.kahadb.disk.journal.Journal; import org.apache.activemq.store.kahadb.disk.journal.Location; import org.apache.activemq.util.DataByteArrayInputStream; import org.apache.activemq.util.DataByteArrayOutputStream; +import org.apache.activemq.util.IOHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -186,6 +194,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore { return inflightTransactions.remove(txid); } + @Override public void prepare(TransactionId txid) throws IOException { Tx tx = getTx(txid); for (TransactionStore store : tx.getStores()) { @@ -193,6 +202,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore { } } + @Override public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit) throws IOException { @@ -247,6 +257,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore { return location; } + @Override public void rollback(TransactionId txid) throws IOException { Tx tx = removeTx(txid); if (tx != null) { @@ -256,6 +267,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore { } } + @Override public void start() throws Exception { journal = new Journal() { @Override @@ -289,6 +301,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore { return new File(multiKahaDBPersistenceAdapter.getDirectory(), "txStore"); } + @Override public void stop() throws Exception { journal.close(); journal = null; @@ -334,6 +347,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore { } + @Override public synchronized void recover(final TransactionRecoveryListener listener) throws IOException { for (final PersistenceAdapter adapter : multiKahaDBPersistenceAdapter.adapters) { http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java index 66ae496..45e35c6 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java @@ -22,12 +22,13 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; import java.util.Map; -import java.util.Set; import java.util.Map.Entry; +import java.util.Set; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.scheduler.JobSchedulerStore; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTempQueue; @@ -51,31 +52,35 @@ import org.apache.activemq.store.TransactionRecoveryListener; import org.apache.activemq.store.TransactionStore; import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; import org.apache.activemq.store.kahadb.data.KahaDestination; +import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; import org.apache.activemq.store.kahadb.data.KahaLocation; import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; -import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; +import org.apache.activemq.store.kahadb.disk.journal.Location; +import org.apache.activemq.store.kahadb.disk.page.Transaction; import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.wireformat.WireFormat; -import org.apache.activemq.store.kahadb.disk.journal.Location; -import org.apache.activemq.store.kahadb.disk.page.Transaction; public class TempKahaDBStore extends TempMessageDatabase implements PersistenceAdapter, BrokerServiceAware { private final WireFormat wireFormat = new OpenWireFormat(); private BrokerService brokerService; + @Override public void setBrokerName(String brokerName) { } + @Override public void setUsageManager(SystemUsage usageManager) { } + @Override public TransactionStore createTransactionStore() throws IOException { return new TransactionStore(){ - + + @Override public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException { if (preCommit != null) { preCommit.run(); @@ -85,18 +90,21 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA postCommit.run(); } } + @Override public void prepare(TransactionId txid) throws IOException { - processPrepare(txid); + processPrepare(txid); } + @Override public void rollback(TransactionId txid) throws IOException { - processRollback(txid); + processRollback(txid); } + @Override public void recover(TransactionRecoveryListener listener) throws IOException { for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) { XATransactionId xid = (XATransactionId)entry.getKey(); ArrayList<Message> messageList = new ArrayList<Message>(); ArrayList<MessageAck> ackList = new ArrayList<MessageAck>(); - + for (Operation op : entry.getValue()) { if( op.getClass() == AddOpperation.class ) { AddOpperation addOp = (AddOpperation)op; @@ -108,7 +116,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA ackList.add(ack); } } - + Message[] addedMessages = new Message[messageList.size()]; MessageAck[] acks = new MessageAck[ackList.size()]; messageList.toArray(addedMessages); @@ -116,8 +124,10 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA listener.recover(xid, addedMessages, acks); } } + @Override public void start() throws Exception { } + @Override public void stop() throws Exception { } }; @@ -136,13 +146,15 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA return destination; } + @Override public void addMessage(ConnectionContext context, Message message) throws IOException { KahaAddMessageCommand command = new KahaAddMessageCommand(); command.setDestination(dest); command.setMessageId(message.getMessageId().toProducerKey()); processAdd(command, message.getTransactionId(), wireFormat.marshal(message)); } - + + @Override public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); command.setDestination(dest); @@ -150,20 +162,23 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA processRemove(command, ack.getTransactionId()); } + @Override public void removeAllMessages(ConnectionContext context) throws IOException { KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand(); command.setDestination(dest); process(command); } + @Override public Message getMessage(MessageId identity) throws IOException { final String key = identity.toProducerKey(); - + // Hopefully one day the page file supports concurrent read operations... but for now we must // externally synchronize... ByteSequence data; synchronized(indexMutex) { data = pageFile.tx().execute(new Transaction.CallableClosure<ByteSequence, IOException>(){ + @Override public ByteSequence execute(Transaction tx) throws IOException { StoredDestination sd = getStoredDestination(dest, tx); Long sequence = sd.messageIdIndex.get(tx, key); @@ -177,14 +192,16 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA if( data == null ) { return null; } - + Message msg = (Message)wireFormat.unmarshal( data ); - return msg; + return msg; } - + + @Override public int getMessageCount() throws IOException { synchronized(indexMutex) { return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){ + @Override public Integer execute(Transaction tx) throws IOException { // Iterate through all index entries to get a count of messages in the destination. StoredDestination sd = getStoredDestination(dest, tx); @@ -199,9 +216,11 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA } } + @Override public void recover(final MessageRecoveryListener listener) throws Exception { synchronized(indexMutex) { pageFile.tx().execute(new Transaction.Closure<Exception>(){ + @Override public void execute(Transaction tx) throws Exception { StoredDestination sd = getStoredDestination(dest, tx); for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) { @@ -214,10 +233,12 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA } long cursorPos=0; - + + @Override public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { synchronized(indexMutex) { pageFile.tx().execute(new Transaction.Closure<Exception>(){ + @Override public void execute(Transaction tx) throws Exception { StoredDestination sd = getStoredDestination(dest, tx); Entry<Long, MessageRecord> entry=null; @@ -238,20 +259,22 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA } } + @Override public void resetBatching() { cursorPos=0; } - + @Override public void setBatch(MessageId identity) throws IOException { final String key = identity.toProducerKey(); - + // Hopefully one day the page file supports concurrent read operations... but for now we must // externally synchronize... Long location; synchronized(indexMutex) { location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>(){ + @Override public Long execute(Transaction tx) throws IOException { StoredDestination sd = getStoredDestination(dest, tx); return sd.messageIdIndex.get(tx, key); @@ -261,7 +284,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA if( location!=null ) { cursorPos=location+1; } - + } @Override @@ -273,14 +296,15 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA @Override public void stop() throws Exception { } - + } - + class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore { public KahaDBTopicMessageStore(ActiveMQTopic destination) { super(destination); } - + + @Override public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException { KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); @@ -294,6 +318,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA processRemove(command, null); } + @Override public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName()); KahaSubscriptionCommand command = new KahaSubscriptionCommand(); @@ -305,6 +330,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA process(command); } + @Override public void deleteSubscription(String clientId, String subscriptionName) throws IOException { KahaSubscriptionCommand command = new KahaSubscriptionCommand(); command.setDestination(dest); @@ -312,11 +338,13 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA process(command); } + @Override public SubscriptionInfo[] getAllSubscriptions() throws IOException { - + final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>(); synchronized(indexMutex) { pageFile.tx().execute(new Transaction.Closure<IOException>(){ + @Override public void execute(Transaction tx) throws IOException { StoredDestination sd = getStoredDestination(dest, tx); for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator.hasNext();) { @@ -328,16 +356,18 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA } }); } - + SubscriptionInfo[]rc=new SubscriptionInfo[subscriptions.size()]; subscriptions.toArray(rc); return rc; } + @Override public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); synchronized(indexMutex) { return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>(){ + @Override public SubscriptionInfo execute(Transaction tx) throws IOException { StoredDestination sd = getStoredDestination(dest, tx); KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey); @@ -349,11 +379,13 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA }); } } - + + @Override public int getMessageCount(String clientId, String subscriptionName) throws IOException { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); synchronized(indexMutex) { return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){ + @Override public Integer execute(Transaction tx) throws IOException { StoredDestination sd = getStoredDestination(dest, tx); Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); @@ -362,7 +394,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA return 0; } cursorPos += 1; - + int counter = 0; for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { iterator.next(); @@ -371,18 +403,20 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA return counter; } }); - } + } } + @Override public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); synchronized(indexMutex) { pageFile.tx().execute(new Transaction.Closure<Exception>(){ + @Override public void execute(Transaction tx) throws Exception { StoredDestination sd = getStoredDestination(dest, tx); Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); cursorPos += 1; - + for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { Entry<Long, MessageRecord> entry = iterator.next(); listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) ); @@ -392,10 +426,12 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA } } + @Override public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); synchronized(indexMutex) { pageFile.tx().execute(new Transaction.Closure<Exception>(){ + @Override public void execute(Transaction tx) throws Exception { StoredDestination sd = getStoredDestination(dest, tx); Long cursorPos = sd.subscriptionCursors.get(subscriptionKey); @@ -403,7 +439,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); cursorPos += 1; } - + Entry<Long, MessageRecord> entry=null; int counter = 0; for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { @@ -422,11 +458,13 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA } } + @Override public void resetBatching(String clientId, String subscriptionName) { try { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); synchronized(indexMutex) { pageFile.tx().execute(new Transaction.Closure<IOException>(){ + @Override public void execute(Transaction tx) throws IOException { StoredDestination sd = getStoredDestination(dest, tx); sd.subscriptionCursors.remove(subscriptionKey); @@ -442,11 +480,13 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA String subscriptionKey(String clientId, String subscriptionName){ return clientId+":"+subscriptionName; } - + + @Override public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { return new KahaDBMessageStore(destination); } + @Override public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { return new KahaDBTopicMessageStore(destination); } @@ -457,6 +497,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA * * @param destination Destination to forget */ + @Override public void removeQueueMessageStore(ActiveMQQueue destination) { } @@ -466,18 +507,22 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA * * @param destination Destination to forget */ + @Override public void removeTopicMessageStore(ActiveMQTopic destination) { } + @Override public void deleteAllMessages() throws IOException { } - - + + + @Override public Set<ActiveMQDestination> getDestinations() { try { final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); synchronized(indexMutex) { pageFile.tx().execute(new Transaction.Closure<IOException>(){ + @Override public void execute(Transaction tx) throws IOException { for (Iterator<Entry<String, StoredDestination>> iterator = destinations.iterator(tx); iterator.hasNext();) { Entry<String, StoredDestination> entry = iterator.next(); @@ -491,11 +536,13 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA throw new RuntimeException(e); } } - + + @Override public long getLastMessageBrokerSequenceId() throws IOException { return 0; } - + + @Override public long size() { if ( !started.get() ) { return 0; @@ -507,32 +554,36 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA } } + @Override public void beginTransaction(ConnectionContext context) throws IOException { throw new IOException("Not yet implemented."); } + @Override public void commitTransaction(ConnectionContext context) throws IOException { throw new IOException("Not yet implemented."); } + @Override public void rollbackTransaction(ConnectionContext context) throws IOException { throw new IOException("Not yet implemented."); } - + + @Override public void checkpoint(boolean sync) throws IOException { - } + } /////////////////////////////////////////////////////////////////// // Internal conversion methods. /////////////////////////////////////////////////////////////////// - - + + KahaLocation convert(Location location) { KahaLocation rc = new KahaLocation(); rc.setLogId(location.getDataFileId()); rc.setOffset(location.getOffset()); return rc; } - + KahaDestination convert(ActiveMQDestination dest) { KahaDestination rc = new KahaDestination(); rc.setName(dest.getPhysicalName()); @@ -561,7 +612,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA } int type = Integer.parseInt(dest.substring(0, p)); String name = dest.substring(p+1); - + switch( KahaDestination.DestinationType.valueOf(type) ) { case QUEUE: return new ActiveMQQueue(name); @@ -571,11 +622,12 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA return new ActiveMQTempQueue(name); case TEMP_TOPIC: return new ActiveMQTempTopic(name); - default: + default: throw new IllegalArgumentException("Not in the valid destination format"); } } - + + @Override public long getLastProducerSequenceId(ProducerId id) { return -1; } @@ -592,4 +644,8 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA } super.load(); } + @Override + public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java index be4f2ff..43fc152 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java @@ -20,11 +20,16 @@ import java.io.IOException; import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand; import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; +import org.apache.activemq.store.kahadb.data.KahaAddScheduledJobCommand; import org.apache.activemq.store.kahadb.data.KahaCommitCommand; +import org.apache.activemq.store.kahadb.data.KahaDestroySchedulerCommand; import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand; import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; +import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobCommand; +import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobsCommand; +import org.apache.activemq.store.kahadb.data.KahaRescheduleJobCommand; import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; import org.apache.activemq.store.kahadb.data.KahaTraceCommand; @@ -62,6 +67,21 @@ public class Visitor { public void visit(KahaAckMessageFileMapCommand kahaProducerAuditCommand) throws IOException { } + public void visit(KahaAddScheduledJobCommand kahaAddScheduledJobCommand) throws IOException { + } + + public void visit(KahaRescheduleJobCommand KahaRescheduleJobCommand) throws IOException { + } + + public void visit(KahaRemoveScheduledJobCommand kahaRemoveScheduledJobCommand) throws IOException { + } + + public void visit(KahaRemoveScheduledJobsCommand kahaRemoveScheduledJobsCommand) throws IOException { + } + + public void visit(KahaDestroySchedulerCommand KahaDestroySchedulerCommand) throws IOException { + } + public void visit(KahaUpdateMessageCommand kahaUpdateMessageCommand) throws IOException { } } http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java index 86b9fa3..217bc1f 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java @@ -25,8 +25,8 @@ public class JobImpl implements Job { private final JobLocation jobLocation; private final byte[] payload; - protected JobImpl(JobLocation location,ByteSequence bs) { - this.jobLocation=location; + protected JobImpl(JobLocation location, ByteSequence bs) { + this.jobLocation = location; this.payload = new byte[bs.getLength()]; System.arraycopy(bs.getData(), bs.getOffset(), this.payload, 0, bs.getLength()); } @@ -38,22 +38,22 @@ public class JobImpl implements Job { @Override public byte[] getPayload() { - return this.payload; + return this.payload; } @Override public long getPeriod() { - return this.jobLocation.getPeriod(); + return this.jobLocation.getPeriod(); } @Override public int getRepeat() { - return this.jobLocation.getRepeat(); + return this.jobLocation.getRepeat(); } @Override public long getStart() { - return this.jobLocation.getStartTime(); + return this.jobLocation.getStartTime(); } @Override @@ -76,4 +76,13 @@ public class JobImpl implements Job { return JobSupport.getDateTime(getStart()); } + @Override + public int getExecutionCount() { + return this.jobLocation.getRescheduledCount(); + } + + @Override + public String toString() { + return "Job: " + getJobId(); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java index 13cf376..cb66145 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java @@ -36,6 +36,8 @@ class JobLocation { private long period; private String cronEntry; private final Location location; + private int rescheduledCount; + private Location lastUpdate; public JobLocation(Location location) { this.location = location; @@ -52,8 +54,12 @@ class JobLocation { this.delay = in.readLong(); this.nextTime = in.readLong(); this.period = in.readLong(); - this.cronEntry=in.readUTF(); + this.cronEntry = in.readUTF(); this.location.readExternal(in); + if (in.readBoolean()) { + this.lastUpdate = new Location(); + this.lastUpdate.readExternal(in); + } } public void writeExternal(DataOutput out) throws IOException { @@ -63,11 +69,17 @@ class JobLocation { out.writeLong(this.delay); out.writeLong(this.nextTime); out.writeLong(this.period); - if (this.cronEntry==null) { - this.cronEntry=""; + if (this.cronEntry == null) { + this.cronEntry = ""; } out.writeUTF(this.cronEntry); this.location.writeExternal(out); + if (lastUpdate != null) { + out.writeBoolean(true); + this.lastUpdate.writeExternal(out); + } else { + out.writeBoolean(false); + } } /** @@ -123,7 +135,8 @@ class JobLocation { } /** - * @param nextTime the nextTime to set + * @param nextTime + * the nextTime to set */ public synchronized void setNextTime(long nextTime) { this.nextTime = nextTime; @@ -152,7 +165,8 @@ class JobLocation { } /** - * @param cronEntry the cronEntry to set + * @param cronEntry + * the cronEntry to set */ public synchronized void setCronEntry(String cronEntry) { this.cronEntry = cronEntry; @@ -173,7 +187,8 @@ class JobLocation { } /** - * @param delay the delay to set + * @param delay + * the delay to set */ public void setDelay(long delay) { this.delay = delay; @@ -186,15 +201,55 @@ class JobLocation { return this.location; } + /** + * @returns the location in the journal of the last update issued for this + * Job. + */ + public Location getLastUpdate() { + return this.lastUpdate; + } + + /** + * Sets the location of the last update command written to the journal for + * this Job. The update commands set the next execution time for this job. + * We need to keep track of only the latest update as it's the only one we + * really need to recover the correct state from the journal. + * + * @param location + * The location in the journal of the last update command. + */ + public void setLastUpdate(Location location) { + this.lastUpdate = location; + } + + /** + * @return the number of time this job has been rescheduled. + */ + public int getRescheduledCount() { + return rescheduledCount; + } + + /** + * Sets the number of time this job has been rescheduled. A newly added job will return + * zero and increment this value each time a scheduled message is dispatched to its + * target destination and the job is rescheduled for another cycle. + * + * @param executionCount + * the new execution count to assign the JobLocation. + */ + public void setRescheduledCount(int rescheduledCount) { + this.rescheduledCount = rescheduledCount; + } + @Override public String toString() { - return "Job [id=" + jobId + ", startTime=" + new Date(startTime) - + ", delay=" + delay + ", period=" + period + ", repeat=" - + repeat + ", nextTime=" + new Date(nextTime) + "]"; + return "Job [id=" + jobId + ", startTime=" + new Date(startTime) + ", delay=" + delay + ", period=" + period + ", repeat=" + repeat + ", nextTime=" + + new Date(nextTime) + ", executionCount = " + (rescheduledCount + 1) + "]"; } static class JobLocationMarshaller extends VariableMarshaller<List<JobLocation>> { static final JobLocationMarshaller INSTANCE = new JobLocationMarshaller(); + @Override public List<JobLocation> readPayload(DataInput dataIn) throws IOException { List<JobLocation> result = new ArrayList<JobLocation>(); @@ -228,6 +283,7 @@ class JobLocation { result = prime * result + (int) (period ^ (period >>> 32)); result = prime * result + repeat; result = prime * result + (int) (startTime ^ (startTime >>> 32)); + result = prime * result + (rescheduledCount ^ (rescheduledCount >>> 32)); return result; } @@ -286,6 +342,9 @@ class JobLocation { if (startTime != other.startTime) { return false; } + if (rescheduledCount != other.rescheduledCount) { + return false; + } return true; }
