http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java deleted file mode 100644 index 6003c87..0000000 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java +++ /dev/null @@ -1,745 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.store.kahadb; - -import java.io.File; -import java.io.IOException; -import java.util.Date; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.apache.activemq.broker.LockableServiceSupport; -import org.apache.activemq.broker.Locker; -import org.apache.activemq.store.SharedFileLocker; -import org.apache.activemq.store.kahadb.data.KahaEntryType; -import org.apache.activemq.store.kahadb.data.KahaTraceCommand; -import org.apache.activemq.store.kahadb.disk.journal.Journal; -import org.apache.activemq.store.kahadb.disk.journal.Location; -import org.apache.activemq.store.kahadb.disk.page.PageFile; -import org.apache.activemq.store.kahadb.disk.page.Transaction; -import org.apache.activemq.util.ByteSequence; -import org.apache.activemq.util.DataByteArrayInputStream; -import org.apache.activemq.util.DataByteArrayOutputStream; -import org.apache.activemq.util.IOHelper; -import org.apache.activemq.util.ServiceStopper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public abstract class AbstractKahaDBStore extends LockableServiceSupport { - - static final Logger LOG = LoggerFactory.getLogger(AbstractKahaDBStore.class); - - public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME"; - public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0); - - protected File directory; - protected PageFile pageFile; - protected Journal journal; - protected AtomicLong journalSize = new AtomicLong(0); - protected boolean failIfDatabaseIsLocked; - protected long checkpointInterval = 5*1000; - protected long cleanupInterval = 30*1000; - protected boolean checkForCorruptJournalFiles = false; - protected boolean checksumJournalFiles = true; - protected boolean forceRecoverIndex = false; - protected int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; - protected int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; - protected boolean archiveCorruptedIndex = false; - protected boolean enableIndexWriteAsync = false; - protected boolean enableJournalDiskSyncs = false; - protected boolean deleteAllJobs = false; - protected int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; - protected boolean useIndexLFRUEviction = false; - protected float indexLFUEvictionFactor = 0.2f; - protected boolean ignoreMissingJournalfiles = false; - protected int indexCacheSize = 1000; - protected boolean enableIndexDiskSyncs = true; - protected boolean enableIndexRecoveryFile = true; - protected boolean enableIndexPageCaching = true; - protected boolean archiveDataLogs; - protected boolean purgeStoreOnStartup; - protected File directoryArchive; - - protected AtomicBoolean opened = new AtomicBoolean(); - protected Thread checkpointThread; - protected final Object checkpointThreadLock = new Object(); - protected ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock(); - protected ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock(); - - /** - * @return the name to give this store's PageFile instance. - */ - protected abstract String getPageFileName(); - - /** - * @return the location of the data directory if no set by configuration. - */ - protected abstract File getDefaultDataDirectory(); - - /** - * Loads the store from disk. - * - * Based on configuration this method can either load an existing store or it can purge - * an existing store and start in a clean state. - * - * @throws IOException if an error occurs during the load. - */ - public abstract void load() throws IOException; - - /** - * Unload the state of the Store to disk and shuts down all resources assigned to this - * KahaDB store implementation. - * - * @throws IOException if an error occurs during the store unload. - */ - public abstract void unload() throws IOException; - - @Override - protected void doStart() throws Exception { - this.indexLock.writeLock().lock(); - if (getDirectory() == null) { - setDirectory(getDefaultDataDirectory()); - } - IOHelper.mkdirs(getDirectory()); - try { - if (isPurgeStoreOnStartup()) { - getJournal().start(); - getJournal().delete(); - getJournal().close(); - journal = null; - getPageFile().delete(); - LOG.info("{} Persistence store purged.", this); - setPurgeStoreOnStartup(false); - } - - load(); - store(new KahaTraceCommand().setMessage("LOADED " + new Date())); - } finally { - this.indexLock.writeLock().unlock(); - } - } - - @Override - protected void doStop(ServiceStopper stopper) throws Exception { - unload(); - } - - public PageFile getPageFile() { - if (pageFile == null) { - pageFile = createPageFile(); - } - return pageFile; - } - - public Journal getJournal() throws IOException { - if (journal == null) { - journal = createJournal(); - } - return journal; - } - - public File getDirectory() { - return directory; - } - - public void setDirectory(File directory) { - this.directory = directory; - } - - public boolean isArchiveCorruptedIndex() { - return archiveCorruptedIndex; - } - - public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) { - this.archiveCorruptedIndex = archiveCorruptedIndex; - } - - public boolean isFailIfDatabaseIsLocked() { - return failIfDatabaseIsLocked; - } - - public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { - this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; - } - - public boolean isCheckForCorruptJournalFiles() { - return checkForCorruptJournalFiles; - } - - public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { - this.checkForCorruptJournalFiles = checkForCorruptJournalFiles; - } - - public long getCheckpointInterval() { - return checkpointInterval; - } - - public void setCheckpointInterval(long checkpointInterval) { - this.checkpointInterval = checkpointInterval; - } - - public long getCleanupInterval() { - return cleanupInterval; - } - - public void setCleanupInterval(long cleanupInterval) { - this.cleanupInterval = cleanupInterval; - } - - public boolean isChecksumJournalFiles() { - return checksumJournalFiles; - } - - public void setChecksumJournalFiles(boolean checksumJournalFiles) { - this.checksumJournalFiles = checksumJournalFiles; - } - - public boolean isForceRecoverIndex() { - return forceRecoverIndex; - } - - public void setForceRecoverIndex(boolean forceRecoverIndex) { - this.forceRecoverIndex = forceRecoverIndex; - } - - public int getJournalMaxFileLength() { - return journalMaxFileLength; - } - - public void setJournalMaxFileLength(int journalMaxFileLength) { - this.journalMaxFileLength = journalMaxFileLength; - } - - public int getJournalMaxWriteBatchSize() { - return journalMaxWriteBatchSize; - } - - public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { - this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; - } - - public boolean isEnableIndexWriteAsync() { - return enableIndexWriteAsync; - } - - public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { - this.enableIndexWriteAsync = enableIndexWriteAsync; - } - - public boolean isEnableJournalDiskSyncs() { - return enableJournalDiskSyncs; - } - - public void setEnableJournalDiskSyncs(boolean syncWrites) { - this.enableJournalDiskSyncs = syncWrites; - } - - public boolean isDeleteAllJobs() { - return deleteAllJobs; - } - - public void setDeleteAllJobs(boolean deleteAllJobs) { - this.deleteAllJobs = deleteAllJobs; - } - - /** - * @return the archiveDataLogs - */ - public boolean isArchiveDataLogs() { - return this.archiveDataLogs; - } - - /** - * @param archiveDataLogs the archiveDataLogs to set - */ - public void setArchiveDataLogs(boolean archiveDataLogs) { - this.archiveDataLogs = archiveDataLogs; - } - - /** - * @return the directoryArchive - */ - public File getDirectoryArchive() { - return this.directoryArchive; - } - - /** - * @param directoryArchive the directoryArchive to set - */ - public void setDirectoryArchive(File directoryArchive) { - this.directoryArchive = directoryArchive; - } - - public int getIndexCacheSize() { - return indexCacheSize; - } - - public void setIndexCacheSize(int indexCacheSize) { - this.indexCacheSize = indexCacheSize; - } - - public int getIndexWriteBatchSize() { - return indexWriteBatchSize; - } - - public void setIndexWriteBatchSize(int indexWriteBatchSize) { - this.indexWriteBatchSize = indexWriteBatchSize; - } - - public boolean isUseIndexLFRUEviction() { - return useIndexLFRUEviction; - } - - public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) { - this.useIndexLFRUEviction = useIndexLFRUEviction; - } - - public float getIndexLFUEvictionFactor() { - return indexLFUEvictionFactor; - } - - public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) { - this.indexLFUEvictionFactor = indexLFUEvictionFactor; - } - - public boolean isEnableIndexDiskSyncs() { - return enableIndexDiskSyncs; - } - - public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) { - this.enableIndexDiskSyncs = enableIndexDiskSyncs; - } - - public boolean isEnableIndexRecoveryFile() { - return enableIndexRecoveryFile; - } - - public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) { - this.enableIndexRecoveryFile = enableIndexRecoveryFile; - } - - public boolean isEnableIndexPageCaching() { - return enableIndexPageCaching; - } - - public void setEnableIndexPageCaching(boolean enableIndexPageCaching) { - this.enableIndexPageCaching = enableIndexPageCaching; - } - - public boolean isPurgeStoreOnStartup() { - return this.purgeStoreOnStartup; - } - - public void setPurgeStoreOnStartup(boolean purge) { - this.purgeStoreOnStartup = purge; - } - - public boolean isIgnoreMissingJournalfiles() { - return ignoreMissingJournalfiles; - } - - public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) { - this.ignoreMissingJournalfiles = ignoreMissingJournalfiles; - } - - public long size() { - if (!isStarted()) { - return 0; - } - try { - return journalSize.get() + pageFile.getDiskSize(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public Locker createDefaultLocker() throws IOException { - SharedFileLocker locker = new SharedFileLocker(); - locker.setDirectory(this.getDirectory()); - return locker; - } - - @Override - public void init() throws Exception { - } - - /** - * Store a command in the Journal and process to update the Store index. - * - * @param command - * The specific JournalCommand to store and process. - * - * @returns the Location where the data was written in the Journal. - * - * @throws IOException if an error occurs storing or processing the command. - */ - public Location store(JournalCommand<?> command) throws IOException { - return store(command, isEnableIndexDiskSyncs(), null, null, null); - } - - /** - * Store a command in the Journal and process to update the Store index. - * - * @param command - * The specific JournalCommand to store and process. - * @param sync - * Should the store operation be done synchronously. (ignored if completion passed). - * - * @returns the Location where the data was written in the Journal. - * - * @throws IOException if an error occurs storing or processing the command. - */ - public Location store(JournalCommand<?> command, boolean sync) throws IOException { - return store(command, sync, null, null, null); - } - - /** - * Store a command in the Journal and process to update the Store index. - * - * @param command - * The specific JournalCommand to store and process. - * @param onJournalStoreComplete - * The Runnable to call when the Journal write operation completes. - * - * @returns the Location where the data was written in the Journal. - * - * @throws IOException if an error occurs storing or processing the command. - */ - public Location store(JournalCommand<?> command, Runnable onJournalStoreComplete) throws IOException { - return store(command, isEnableIndexDiskSyncs(), null, null, onJournalStoreComplete); - } - - /** - * Store a command in the Journal and process to update the Store index. - * - * @param command - * The specific JournalCommand to store and process. - * @param sync - * Should the store operation be done synchronously. (ignored if completion passed). - * @param before - * The Runnable instance to execute before performing the store and process operation. - * @param after - * The Runnable instance to execute after performing the store and process operation. - * - * @returns the Location where the data was written in the Journal. - * - * @throws IOException if an error occurs storing or processing the command. - */ - public Location store(JournalCommand<?> command, boolean sync, Runnable before, Runnable after) throws IOException { - return store(command, sync, before, after, null); - } - - /** - * All updated are are funneled through this method. The updates are converted to a - * JournalMessage which is logged to the journal and then the data from the JournalMessage - * is used to update the index just like it would be done during a recovery process. - * - * @param command - * The specific JournalCommand to store and process. - * @param sync - * Should the store operation be done synchronously. (ignored if completion passed). - * @param before - * The Runnable instance to execute before performing the store and process operation. - * @param after - * The Runnable instance to execute after performing the store and process operation. - * @param onJournalStoreComplete - * Callback to be run when the journal write operation is complete. - * - * @returns the Location where the data was written in the Journal. - * - * @throws IOException if an error occurs storing or processing the command. - */ - public Location store(JournalCommand<?> command, boolean sync, Runnable before, Runnable after, Runnable onJournalStoreComplete) throws IOException { - try { - - if (before != null) { - before.run(); - } - - ByteSequence sequence = toByteSequence(command); - Location location; - checkpointLock.readLock().lock(); - try { - - long start = System.currentTimeMillis(); - location = onJournalStoreComplete == null ? journal.write(sequence, sync) : - journal.write(sequence, onJournalStoreComplete); - long start2 = System.currentTimeMillis(); - - process(command, location); - - long end = System.currentTimeMillis(); - if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { - LOG.info("Slow KahaDB access: Journal append took: {} ms, Index update took {} ms", - (start2-start), (end-start2)); - } - } finally { - checkpointLock.readLock().unlock(); - } - - if (after != null) { - after.run(); - } - - if (checkpointThread != null && !checkpointThread.isAlive()) { - startCheckpoint(); - } - return location; - } catch (IOException ioe) { - LOG.error("KahaDB failed to store to Journal", ioe); - if (brokerService != null) { - brokerService.handleIOException(ioe); - } - throw ioe; - } - } - - /** - * Loads a previously stored JournalMessage - * - * @param location - * The location of the journal command to read. - * - * @return a new un-marshaled JournalCommand instance. - * - * @throws IOException if an error occurs reading the stored command. - */ - protected JournalCommand<?> load(Location location) throws IOException { - ByteSequence data = journal.read(location); - DataByteArrayInputStream is = new DataByteArrayInputStream(data); - byte readByte = is.readByte(); - KahaEntryType type = KahaEntryType.valueOf(readByte); - if (type == null) { - try { - is.close(); - } catch (IOException e) { - } - throw new IOException("Could not load journal record. Invalid location: " + location); - } - JournalCommand<?> message = (JournalCommand<?>)type.createMessage(); - message.mergeFramed(is); - return message; - } - - /** - * Process a stored or recovered JournalCommand instance and update the DB Index with the - * state changes that this command produces. This can be called either as a new DB operation - * or as a replay during recovery operations. - * - * @param command - * The JournalCommand to process. - * @param location - * The location in the Journal where the command was written or read from. - */ - protected abstract void process(JournalCommand<?> command, Location location) throws IOException; - - /** - * Perform a checkpoint operation with optional cleanup. - * - * Called by the checkpoint background thread periodically to initiate a checkpoint operation - * and if the cleanup flag is set a cleanup sweep should be done to allow for release of no - * longer needed journal log files etc. - * - * @param cleanup - * Should the method do a simple checkpoint or also perform a journal cleanup. - * - * @throws IOException if an error occurs during the checkpoint operation. - */ - protected void checkpointUpdate(final boolean cleanup) throws IOException { - checkpointLock.writeLock().lock(); - try { - this.indexLock.writeLock().lock(); - try { - pageFile.tx().execute(new Transaction.Closure<IOException>() { - @Override - public void execute(Transaction tx) throws IOException { - checkpointUpdate(tx, cleanup); - } - }); - } finally { - this.indexLock.writeLock().unlock(); - } - - } finally { - checkpointLock.writeLock().unlock(); - } - } - - /** - * Perform the checkpoint update operation. If the cleanup flag is true then the - * operation should also purge any unused Journal log files. - * - * This method must always be called with the checkpoint and index write locks held. - * - * @param tx - * The TX under which to perform the checkpoint update. - * @param cleanup - * Should the checkpoint also do unused Journal file cleanup. - * - * @throws IOException if an error occurs while performing the checkpoint. - */ - protected abstract void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException; - - /** - * Creates a new ByteSequence that represents the marshaled form of the given Journal Command. - * - * @param command - * The Journal Command that should be marshaled to bytes for writing. - * - * @return the byte representation of the given journal command. - * - * @throws IOException if an error occurs while serializing the command. - */ - protected ByteSequence toByteSequence(JournalCommand<?> data) throws IOException { - int size = data.serializedSizeFramed(); - DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); - os.writeByte(data.type().getNumber()); - data.writeFramed(os); - return os.toByteSequence(); - } - - /** - * Create the PageFile instance and configure it using the configuration options - * currently set. - * - * @return the newly created and configured PageFile instance. - */ - protected PageFile createPageFile() { - PageFile index = new PageFile(getDirectory(), getPageFileName()); - index.setEnableWriteThread(isEnableIndexWriteAsync()); - index.setWriteBatchSize(getIndexWriteBatchSize()); - index.setPageCacheSize(getIndexCacheSize()); - index.setUseLFRUEviction(isUseIndexLFRUEviction()); - index.setLFUEvictionFactor(getIndexLFUEvictionFactor()); - index.setEnableDiskSyncs(isEnableIndexDiskSyncs()); - index.setEnableRecoveryFile(isEnableIndexRecoveryFile()); - index.setEnablePageCaching(isEnableIndexPageCaching()); - return index; - } - - /** - * Create a new Journal instance and configure it using the currently set configuration - * options. If an archive directory is configured than this method will attempt to create - * that directory if it does not already exist. - * - * @return the newly created an configured Journal instance. - * - * @throws IOException if an error occurs while creating the Journal object. - */ - protected Journal createJournal() throws IOException { - Journal manager = new Journal(); - manager.setDirectory(getDirectory()); - manager.setMaxFileLength(getJournalMaxFileLength()); - manager.setCheckForCorruptionOnStartup(isCheckForCorruptJournalFiles()); - manager.setChecksum(isChecksumJournalFiles() || isCheckForCorruptJournalFiles()); - manager.setWriteBatchSize(getJournalMaxWriteBatchSize()); - manager.setArchiveDataLogs(isArchiveDataLogs()); - manager.setSizeAccumulator(journalSize); - manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs()); - if (getDirectoryArchive() != null) { - IOHelper.mkdirs(getDirectoryArchive()); - manager.setDirectoryArchive(getDirectoryArchive()); - } - return manager; - } - - /** - * Starts the checkpoint Thread instance if not already running and not disabled - * by configuration. - */ - protected void startCheckpoint() { - if (checkpointInterval == 0 && cleanupInterval == 0) { - LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart"); - return; - } - synchronized (checkpointThreadLock) { - boolean start = false; - if (checkpointThread == null) { - start = true; - } else if (!checkpointThread.isAlive()) { - start = true; - LOG.info("KahaDB: Recovering checkpoint thread after death"); - } - if (start) { - checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") { - @Override - public void run() { - try { - long lastCleanup = System.currentTimeMillis(); - long lastCheckpoint = System.currentTimeMillis(); - // Sleep for a short time so we can periodically check - // to see if we need to exit this thread. - long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500); - while (opened.get()) { - Thread.sleep(sleepTime); - long now = System.currentTimeMillis(); - if( cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval) ) { - checkpointCleanup(true); - lastCleanup = now; - lastCheckpoint = now; - } else if( checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval )) { - checkpointCleanup(false); - lastCheckpoint = now; - } - } - } catch (InterruptedException e) { - // Looks like someone really wants us to exit this thread... - } catch (IOException ioe) { - LOG.error("Checkpoint failed", ioe); - brokerService.handleIOException(ioe); - } - } - }; - - checkpointThread.setDaemon(true); - checkpointThread.start(); - } - } - } - - /** - * Called from the worker thread to start a checkpoint. - * - * This method ensure that the store is in an opened state and optionaly logs information - * related to slow store access times. - * - * @param cleanup - * Should a cleanup of the journal occur during the checkpoint operation. - * - * @throws IOException if an error occurs during the checkpoint operation. - */ - protected void checkpointCleanup(final boolean cleanup) throws IOException { - long start; - this.indexLock.writeLock().lock(); - try { - start = System.currentTimeMillis(); - if (!opened.get()) { - return; - } - } finally { - this.indexLock.writeLock().unlock(); - } - checkpointUpdate(cleanup); - long end = System.currentTimeMillis(); - if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { - LOG.info("Slow KahaDB access: cleanup took {}", (end - start)); - } - } -}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBMetaData.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBMetaData.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBMetaData.java deleted file mode 100644 index defb238..0000000 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBMetaData.java +++ /dev/null @@ -1,135 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadb; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.activemq.store.kahadb.disk.journal.Location; -import org.apache.activemq.store.kahadb.disk.page.Page; -import org.apache.activemq.store.kahadb.disk.page.Transaction; - -/** - * Interface for the store meta data used to hold the index value and other needed - * information to manage a KahaDB store instance. - */ -public interface KahaDBMetaData<T> { - - /** - * Indicates that this meta data instance has been opened and is active. - */ - public static final int OPEN_STATE = 2; - - /** - * Indicates that this meta data instance has been closed and is no longer active. - */ - public static final int CLOSED_STATE = 1; - - /** - * Gets the Page in the store PageFile where the KahaDBMetaData instance is stored. - * - * @return the Page to use to start access the KahaDBMetaData instance. - */ - Page<T> getPage(); - - /** - * Sets the Page instance used to load and store the KahaDBMetaData instance. - * - * @param page - * the new Page value to use. - */ - void setPage(Page<T> page); - - /** - * Gets the state flag of this meta data instance. - * - * @return the current state value for this instance. - */ - int getState(); - - /** - * Sets the current value of the state flag. - * - * @param value - * the new value to assign to the state flag. - */ - void setState(int value); - - /** - * Returns the Journal Location value that indicates that last recorded update - * that was successfully performed for this KahaDB store implementation. - * - * @return the location of the last successful update location. - */ - Location getLastUpdateLocation(); - - /** - * Updates the value of the last successful update. - * - * @param location - * the new value to assign the last update location field. - */ - void setLastUpdateLocation(Location location); - - /** - * For a newly created KahaDBMetaData instance this method is called to allow - * the instance to create all of it's internal indices and other state data. - * - * @param tx - * the Transaction instance under which the operation is executed. - * - * @throws IOException if an error occurs while creating the meta data structures. - */ - void initialize(Transaction tx) throws IOException; - - /** - * Instructs this object to load its internal data structures from the KahaDB PageFile - * and prepare itself for use. - * - * @param tx - * the Transaction instance under which the operation is executed. - * - * @throws IOException if an error occurs while creating the meta data structures. - */ - void load(Transaction tx) throws IOException; - - /** - * Reads the serialized for of this object from the KadaDB PageFile and prepares it - * for use. This method does not need to perform a full load of the meta data structures - * only read in the information necessary to load them from the PageFile on a call to the - * load method. - * - * @param in - * the DataInput instance used to read this objects serialized form. - * - * @throws IOException if an error occurs while reading the serialized form. - */ - void read(DataInput in) throws IOException; - - /** - * Writes the object into a serialized form which can be read back in again using the - * read method. - * - * @param out - * the DataOutput instance to use to write the current state to a serialized form. - * - * @throws IOException if an error occurs while serializing this instance. - */ - void write(DataOutput out) throws IOException; - -} http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java index 9b83a0e..e199d68 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java @@ -31,7 +31,6 @@ import org.apache.activemq.broker.LockableServiceSupport; import org.apache.activemq.broker.Locker; import org.apache.activemq.broker.jmx.AnnotatedMBean; import org.apache.activemq.broker.jmx.PersistenceAdapterView; -import org.apache.activemq.broker.scheduler.JobSchedulerStore; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; @@ -40,14 +39,7 @@ import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.XATransactionId; import org.apache.activemq.protobuf.Buffer; -import org.apache.activemq.store.JournaledStore; -import org.apache.activemq.store.MessageStore; -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.SharedFileLocker; -import org.apache.activemq.store.TopicMessageStore; -import org.apache.activemq.store.TransactionIdTransformer; -import org.apache.activemq.store.TransactionIdTransformerAware; -import org.apache.activemq.store.TransactionStore; +import org.apache.activemq.store.*; import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId; import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; import org.apache.activemq.store.kahadb.data.KahaXATransactionId; @@ -667,9 +659,4 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) { getStore().setTransactionIdTransformer(transactionIdTransformer); } - - @Override - public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { - return this.letter.createJobSchedulerStore(); - } } http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 975cd05..60c0738 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -42,7 +42,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.RegionBroker; -import org.apache.activemq.broker.scheduler.JobSchedulerStore; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTempQueue; @@ -56,14 +55,7 @@ import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.protobuf.Buffer; -import org.apache.activemq.store.AbstractMessageStore; -import org.apache.activemq.store.ListenableFuture; -import org.apache.activemq.store.MessageRecoveryListener; -import org.apache.activemq.store.MessageStore; -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.TopicMessageStore; -import org.apache.activemq.store.TransactionIdTransformer; -import org.apache.activemq.store.TransactionStore; +import org.apache.activemq.store.*; import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; import org.apache.activemq.store.kahadb.data.KahaDestination; import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; @@ -74,7 +66,6 @@ import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand; import org.apache.activemq.store.kahadb.disk.journal.Location; import org.apache.activemq.store.kahadb.disk.page.Transaction; -import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.ServiceStopper; @@ -181,7 +172,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public int getMaxAsyncJobs() { return this.maxAsyncJobs; } - /** * @param maxAsyncJobs * the maxAsyncJobs to set @@ -436,7 +426,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } - @Override public void updateMessage(Message message) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter()); @@ -483,7 +472,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { indexLock.writeLock().lock(); try { location = findMessageLocation(key, dest); - } finally { + }finally { indexLock.writeLock().unlock(); } if (location == null) { @@ -503,17 +492,19 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { @Override public Integer execute(Transaction tx) throws IOException { // Iterate through all index entries to get a count - // of messages in the destination. + // of + // messages in the destination. StoredDestination sd = getStoredDestination(dest, tx); int rc = 0; - for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) { + for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator + .hasNext();) { iterator.next(); rc++; } return rc; } }); - } finally { + }finally { indexLock.writeLock().unlock(); } } finally { @@ -534,7 +525,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { return sd.locationIndex.isEmpty(tx); } }); - } finally { + }finally { indexLock.writeLock().unlock(); } } @@ -561,11 +552,12 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } } }); - } finally { + }finally { indexLock.writeLock().unlock(); } } + @Override public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { indexLock.writeLock().lock(); @@ -591,7 +583,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { sd.orderIndex.stoppedIterating(); } }); - } finally { + }finally { indexLock.writeLock().unlock(); } } @@ -636,7 +628,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { }); } catch (Exception e) { LOG.error("Failed to reset batching",e); - } finally { + }finally { indexLock.writeLock().unlock(); } } @@ -649,7 +641,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { lockAsyncJobQueue(); // Hopefully one day the page file supports concurrent read - // operations... but for now we must externally synchronize... + // operations... but for now we must + // externally synchronize... indexLock.writeLock().lock(); try { @@ -732,7 +725,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { @Override public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, - MessageId messageId, MessageAck ack) throws IOException { + MessageId messageId, MessageAck ack) + throws IOException { String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString(); if (isConcurrentStoreAndDispatchTopics()) { AsyncJobKey key = new AsyncJobKey(messageId, getDestination()); @@ -816,7 +810,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } } }); - } finally { + }finally { indexLock.writeLock().unlock(); } @@ -842,7 +836,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { .getSubscriptionInfo().newInput())); } }); - } finally { + }finally { indexLock.writeLock().unlock(); } } @@ -865,7 +859,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { return (int) getStoredMessageCount(tx, sd, subscriptionKey); } }); - } finally { + }finally { indexLock.writeLock().unlock(); } } @@ -896,7 +890,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { sd.orderIndex.resetCursorPosition(); } }); - } finally { + }finally { indexLock.writeLock().unlock(); } } @@ -949,7 +943,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } } }); - } finally { + }finally { indexLock.writeLock().unlock(); } } @@ -1364,6 +1358,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { LOG.warn("Failed to aquire lock", e); } } + } @Override @@ -1427,11 +1422,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { if (runnable instanceof StoreTask) { ((StoreTask)runnable).releaseLocks(); } - } - } - @Override - public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { - return new JobSchedulerStoreImpl(); + } } } http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java index eca83e8..d10c4eb 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java @@ -16,44 +16,12 @@ */ package org.apache.activemq.store.kahadb; -import java.io.File; -import java.io.FileFilter; -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import javax.transaction.xa.Xid; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.BrokerServiceAware; -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.broker.Lockable; -import org.apache.activemq.broker.LockableServiceSupport; -import org.apache.activemq.broker.Locker; -import org.apache.activemq.broker.scheduler.JobSchedulerStore; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.command.LocalTransactionId; -import org.apache.activemq.command.ProducerId; -import org.apache.activemq.command.TransactionId; -import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.broker.*; +import org.apache.activemq.command.*; import org.apache.activemq.filter.AnyDestination; import org.apache.activemq.filter.DestinationMap; import org.apache.activemq.filter.DestinationMapEntry; -import org.apache.activemq.store.MessageStore; -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.SharedFileLocker; -import org.apache.activemq.store.TopicMessageStore; -import org.apache.activemq.store.TransactionIdTransformer; -import org.apache.activemq.store.TransactionIdTransformerAware; -import org.apache.activemq.store.TransactionStore; -import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; +import org.apache.activemq.store.*; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOHelper; @@ -62,6 +30,13 @@ import org.apache.activemq.util.ServiceStopper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.transaction.xa.Xid; +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.*; + /** * An implementation of {@link org.apache.activemq.store.PersistenceAdapter} that supports * distribution of destinations across multiple kahaDB persistence adapters @@ -75,7 +50,6 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem final int LOCAL_FORMAT_ID_MAGIC = Integer.valueOf(System.getProperty("org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore.localXaFormatId", "61616")); final class DelegateDestinationMap extends DestinationMap { - @Override public void setEntries(List<DestinationMapEntry> entries) { super.setEntries(entries); } @@ -278,7 +252,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem } if (adapter instanceof PersistenceAdapter) { adapter.removeQueueMessageStore(destination); - removeMessageStore(adapter, destination); + removeMessageStore((PersistenceAdapter)adapter, destination); destinationMap.removeAll(destination); } } @@ -293,7 +267,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem } if (adapter instanceof PersistenceAdapter) { adapter.removeTopicMessageStore(destination); - removeMessageStore(adapter, destination); + removeMessageStore((PersistenceAdapter)adapter, destination); destinationMap.removeAll(destination); } } @@ -479,7 +453,6 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem } } - @Override public BrokerService getBrokerService() { return brokerService; } @@ -530,9 +503,4 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem locker.configure(this); return locker; } - - @Override - public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { - return new JobSchedulerStoreImpl(); - } } http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java index 8840a1d..c7ece83 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java @@ -31,24 +31,16 @@ import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.XATransactionId; -import org.apache.activemq.store.AbstractMessageStore; -import org.apache.activemq.store.ListenableFuture; -import org.apache.activemq.store.MessageStore; -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.ProxyMessageStore; -import org.apache.activemq.store.ProxyTopicMessageStore; -import org.apache.activemq.store.TopicMessageStore; -import org.apache.activemq.store.TransactionRecoveryListener; -import org.apache.activemq.store.TransactionStore; +import org.apache.activemq.store.*; import org.apache.activemq.store.kahadb.data.KahaCommitCommand; import org.apache.activemq.store.kahadb.data.KahaEntryType; import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; import org.apache.activemq.store.kahadb.data.KahaTraceCommand; +import org.apache.activemq.util.IOHelper; import org.apache.activemq.store.kahadb.disk.journal.Journal; import org.apache.activemq.store.kahadb.disk.journal.Location; import org.apache.activemq.util.DataByteArrayInputStream; import org.apache.activemq.util.DataByteArrayOutputStream; -import org.apache.activemq.util.IOHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -194,7 +186,6 @@ public class MultiKahaDBTransactionStore implements TransactionStore { return inflightTransactions.remove(txid); } - @Override public void prepare(TransactionId txid) throws IOException { Tx tx = getTx(txid); for (TransactionStore store : tx.getStores()) { @@ -202,7 +193,6 @@ public class MultiKahaDBTransactionStore implements TransactionStore { } } - @Override public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit) throws IOException { @@ -257,7 +247,6 @@ public class MultiKahaDBTransactionStore implements TransactionStore { return location; } - @Override public void rollback(TransactionId txid) throws IOException { Tx tx = removeTx(txid); if (tx != null) { @@ -267,7 +256,6 @@ public class MultiKahaDBTransactionStore implements TransactionStore { } } - @Override public void start() throws Exception { journal = new Journal() { @Override @@ -301,7 +289,6 @@ public class MultiKahaDBTransactionStore implements TransactionStore { return new File(multiKahaDBPersistenceAdapter.getDirectory(), "txStore"); } - @Override public void stop() throws Exception { journal.close(); journal = null; @@ -347,7 +334,6 @@ public class MultiKahaDBTransactionStore implements TransactionStore { } - @Override public synchronized void recover(final TransactionRecoveryListener listener) throws IOException { for (final PersistenceAdapter adapter : multiKahaDBPersistenceAdapter.adapters) { http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java index 45e35c6..66ae496 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java @@ -22,13 +22,12 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; +import java.util.Map.Entry; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.broker.scheduler.JobSchedulerStore; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTempQueue; @@ -52,35 +51,31 @@ import org.apache.activemq.store.TransactionRecoveryListener; import org.apache.activemq.store.TransactionStore; import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; import org.apache.activemq.store.kahadb.data.KahaDestination; -import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; import org.apache.activemq.store.kahadb.data.KahaLocation; import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; -import org.apache.activemq.store.kahadb.disk.journal.Location; -import org.apache.activemq.store.kahadb.disk.page.Transaction; +import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.wireformat.WireFormat; +import org.apache.activemq.store.kahadb.disk.journal.Location; +import org.apache.activemq.store.kahadb.disk.page.Transaction; public class TempKahaDBStore extends TempMessageDatabase implements PersistenceAdapter, BrokerServiceAware { private final WireFormat wireFormat = new OpenWireFormat(); private BrokerService brokerService; - @Override public void setBrokerName(String brokerName) { } - @Override public void setUsageManager(SystemUsage usageManager) { } - @Override public TransactionStore createTransactionStore() throws IOException { return new TransactionStore(){ - - @Override + public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException { if (preCommit != null) { preCommit.run(); @@ -90,21 +85,18 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA postCommit.run(); } } - @Override public void prepare(TransactionId txid) throws IOException { - processPrepare(txid); + processPrepare(txid); } - @Override public void rollback(TransactionId txid) throws IOException { - processRollback(txid); + processRollback(txid); } - @Override public void recover(TransactionRecoveryListener listener) throws IOException { for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) { XATransactionId xid = (XATransactionId)entry.getKey(); ArrayList<Message> messageList = new ArrayList<Message>(); ArrayList<MessageAck> ackList = new ArrayList<MessageAck>(); - + for (Operation op : entry.getValue()) { if( op.getClass() == AddOpperation.class ) { AddOpperation addOp = (AddOpperation)op; @@ -116,7 +108,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA ackList.add(ack); } } - + Message[] addedMessages = new Message[messageList.size()]; MessageAck[] acks = new MessageAck[ackList.size()]; messageList.toArray(addedMessages); @@ -124,10 +116,8 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA listener.recover(xid, addedMessages, acks); } } - @Override public void start() throws Exception { } - @Override public void stop() throws Exception { } }; @@ -146,15 +136,13 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA return destination; } - @Override public void addMessage(ConnectionContext context, Message message) throws IOException { KahaAddMessageCommand command = new KahaAddMessageCommand(); command.setDestination(dest); command.setMessageId(message.getMessageId().toProducerKey()); processAdd(command, message.getTransactionId(), wireFormat.marshal(message)); } - - @Override + public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); command.setDestination(dest); @@ -162,23 +150,20 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA processRemove(command, ack.getTransactionId()); } - @Override public void removeAllMessages(ConnectionContext context) throws IOException { KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand(); command.setDestination(dest); process(command); } - @Override public Message getMessage(MessageId identity) throws IOException { final String key = identity.toProducerKey(); - + // Hopefully one day the page file supports concurrent read operations... but for now we must // externally synchronize... ByteSequence data; synchronized(indexMutex) { data = pageFile.tx().execute(new Transaction.CallableClosure<ByteSequence, IOException>(){ - @Override public ByteSequence execute(Transaction tx) throws IOException { StoredDestination sd = getStoredDestination(dest, tx); Long sequence = sd.messageIdIndex.get(tx, key); @@ -192,16 +177,14 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA if( data == null ) { return null; } - + Message msg = (Message)wireFormat.unmarshal( data ); - return msg; + return msg; } - - @Override + public int getMessageCount() throws IOException { synchronized(indexMutex) { return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){ - @Override public Integer execute(Transaction tx) throws IOException { // Iterate through all index entries to get a count of messages in the destination. StoredDestination sd = getStoredDestination(dest, tx); @@ -216,11 +199,9 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA } } - @Override public void recover(final MessageRecoveryListener listener) throws Exception { synchronized(indexMutex) { pageFile.tx().execute(new Transaction.Closure<Exception>(){ - @Override public void execute(Transaction tx) throws Exception { StoredDestination sd = getStoredDestination(dest, tx); for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) { @@ -233,12 +214,10 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA } long cursorPos=0; - - @Override + public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { synchronized(indexMutex) { pageFile.tx().execute(new Transaction.Closure<Exception>(){ - @Override public void execute(Transaction tx) throws Exception { StoredDestination sd = getStoredDestination(dest, tx); Entry<Long, MessageRecord> entry=null; @@ -259,22 +238,20 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA } } - @Override public void resetBatching() { cursorPos=0; } - + @Override public void setBatch(MessageId identity) throws IOException { final String key = identity.toProducerKey(); - + // Hopefully one day the page file supports concurrent read operations... but for now we must // externally synchronize... Long location; synchronized(indexMutex) { location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>(){ - @Override public Long execute(Transaction tx) throws IOException { StoredDestination sd = getStoredDestination(dest, tx); return sd.messageIdIndex.get(tx, key); @@ -284,7 +261,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA if( location!=null ) { cursorPos=location+1; } - + } @Override @@ -296,15 +273,14 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA @Override public void stop() throws Exception { } - + } - + class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore { public KahaDBTopicMessageStore(ActiveMQTopic destination) { super(destination); } - - @Override + public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException { KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); @@ -318,7 +294,6 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA processRemove(command, null); } - @Override public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName()); KahaSubscriptionCommand command = new KahaSubscriptionCommand(); @@ -330,7 +305,6 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA process(command); } - @Override public void deleteSubscription(String clientId, String subscriptionName) throws IOException { KahaSubscriptionCommand command = new KahaSubscriptionCommand(); command.setDestination(dest); @@ -338,13 +312,11 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA process(command); } - @Override public SubscriptionInfo[] getAllSubscriptions() throws IOException { - + final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>(); synchronized(indexMutex) { pageFile.tx().execute(new Transaction.Closure<IOException>(){ - @Override public void execute(Transaction tx) throws IOException { StoredDestination sd = getStoredDestination(dest, tx); for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator.hasNext();) { @@ -356,18 +328,16 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA } }); } - + SubscriptionInfo[]rc=new SubscriptionInfo[subscriptions.size()]; subscriptions.toArray(rc); return rc; } - @Override public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); synchronized(indexMutex) { return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>(){ - @Override public SubscriptionInfo execute(Transaction tx) throws IOException { StoredDestination sd = getStoredDestination(dest, tx); KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey); @@ -379,13 +349,11 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA }); } } - - @Override + public int getMessageCount(String clientId, String subscriptionName) throws IOException { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); synchronized(indexMutex) { return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){ - @Override public Integer execute(Transaction tx) throws IOException { StoredDestination sd = getStoredDestination(dest, tx); Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); @@ -394,7 +362,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA return 0; } cursorPos += 1; - + int counter = 0; for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { iterator.next(); @@ -403,20 +371,18 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA return counter; } }); - } + } } - @Override public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); synchronized(indexMutex) { pageFile.tx().execute(new Transaction.Closure<Exception>(){ - @Override public void execute(Transaction tx) throws Exception { StoredDestination sd = getStoredDestination(dest, tx); Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); cursorPos += 1; - + for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { Entry<Long, MessageRecord> entry = iterator.next(); listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) ); @@ -426,12 +392,10 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA } } - @Override public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); synchronized(indexMutex) { pageFile.tx().execute(new Transaction.Closure<Exception>(){ - @Override public void execute(Transaction tx) throws Exception { StoredDestination sd = getStoredDestination(dest, tx); Long cursorPos = sd.subscriptionCursors.get(subscriptionKey); @@ -439,7 +403,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); cursorPos += 1; } - + Entry<Long, MessageRecord> entry=null; int counter = 0; for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { @@ -458,13 +422,11 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA } } - @Override public void resetBatching(String clientId, String subscriptionName) { try { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); synchronized(indexMutex) { pageFile.tx().execute(new Transaction.Closure<IOException>(){ - @Override public void execute(Transaction tx) throws IOException { StoredDestination sd = getStoredDestination(dest, tx); sd.subscriptionCursors.remove(subscriptionKey); @@ -480,13 +442,11 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA String subscriptionKey(String clientId, String subscriptionName){ return clientId+":"+subscriptionName; } - - @Override + public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { return new KahaDBMessageStore(destination); } - @Override public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { return new KahaDBTopicMessageStore(destination); } @@ -497,7 +457,6 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA * * @param destination Destination to forget */ - @Override public void removeQueueMessageStore(ActiveMQQueue destination) { } @@ -507,22 +466,18 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA * * @param destination Destination to forget */ - @Override public void removeTopicMessageStore(ActiveMQTopic destination) { } - @Override public void deleteAllMessages() throws IOException { } - - - @Override + + public Set<ActiveMQDestination> getDestinations() { try { final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); synchronized(indexMutex) { pageFile.tx().execute(new Transaction.Closure<IOException>(){ - @Override public void execute(Transaction tx) throws IOException { for (Iterator<Entry<String, StoredDestination>> iterator = destinations.iterator(tx); iterator.hasNext();) { Entry<String, StoredDestination> entry = iterator.next(); @@ -536,13 +491,11 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA throw new RuntimeException(e); } } - - @Override + public long getLastMessageBrokerSequenceId() throws IOException { return 0; } - - @Override + public long size() { if ( !started.get() ) { return 0; @@ -554,36 +507,32 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA } } - @Override public void beginTransaction(ConnectionContext context) throws IOException { throw new IOException("Not yet implemented."); } - @Override public void commitTransaction(ConnectionContext context) throws IOException { throw new IOException("Not yet implemented."); } - @Override public void rollbackTransaction(ConnectionContext context) throws IOException { throw new IOException("Not yet implemented."); } - - @Override + public void checkpoint(boolean sync) throws IOException { - } + } /////////////////////////////////////////////////////////////////// // Internal conversion methods. /////////////////////////////////////////////////////////////////// + - - + KahaLocation convert(Location location) { KahaLocation rc = new KahaLocation(); rc.setLogId(location.getDataFileId()); rc.setOffset(location.getOffset()); return rc; } - + KahaDestination convert(ActiveMQDestination dest) { KahaDestination rc = new KahaDestination(); rc.setName(dest.getPhysicalName()); @@ -612,7 +561,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA } int type = Integer.parseInt(dest.substring(0, p)); String name = dest.substring(p+1); - + switch( KahaDestination.DestinationType.valueOf(type) ) { case QUEUE: return new ActiveMQQueue(name); @@ -622,12 +571,11 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA return new ActiveMQTempQueue(name); case TEMP_TOPIC: return new ActiveMQTempTopic(name); - default: + default: throw new IllegalArgumentException("Not in the valid destination format"); } } - - @Override + public long getLastProducerSequenceId(ProducerId id) { return -1; } @@ -644,8 +592,4 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA } super.load(); } - @Override - public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { - throw new UnsupportedOperationException(); - } } http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java index 43fc152..be4f2ff 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java @@ -20,16 +20,11 @@ import java.io.IOException; import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand; import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; -import org.apache.activemq.store.kahadb.data.KahaAddScheduledJobCommand; import org.apache.activemq.store.kahadb.data.KahaCommitCommand; -import org.apache.activemq.store.kahadb.data.KahaDestroySchedulerCommand; import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand; import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; -import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobCommand; -import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobsCommand; -import org.apache.activemq.store.kahadb.data.KahaRescheduleJobCommand; import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; import org.apache.activemq.store.kahadb.data.KahaTraceCommand; @@ -67,21 +62,6 @@ public class Visitor { public void visit(KahaAckMessageFileMapCommand kahaProducerAuditCommand) throws IOException { } - public void visit(KahaAddScheduledJobCommand kahaAddScheduledJobCommand) throws IOException { - } - - public void visit(KahaRescheduleJobCommand KahaRescheduleJobCommand) throws IOException { - } - - public void visit(KahaRemoveScheduledJobCommand kahaRemoveScheduledJobCommand) throws IOException { - } - - public void visit(KahaRemoveScheduledJobsCommand kahaRemoveScheduledJobsCommand) throws IOException { - } - - public void visit(KahaDestroySchedulerCommand KahaDestroySchedulerCommand) throws IOException { - } - public void visit(KahaUpdateMessageCommand kahaUpdateMessageCommand) throws IOException { } } http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java index 217bc1f..86b9fa3 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java @@ -25,8 +25,8 @@ public class JobImpl implements Job { private final JobLocation jobLocation; private final byte[] payload; - protected JobImpl(JobLocation location, ByteSequence bs) { - this.jobLocation = location; + protected JobImpl(JobLocation location,ByteSequence bs) { + this.jobLocation=location; this.payload = new byte[bs.getLength()]; System.arraycopy(bs.getData(), bs.getOffset(), this.payload, 0, bs.getLength()); } @@ -38,22 +38,22 @@ public class JobImpl implements Job { @Override public byte[] getPayload() { - return this.payload; + return this.payload; } @Override public long getPeriod() { - return this.jobLocation.getPeriod(); + return this.jobLocation.getPeriod(); } @Override public int getRepeat() { - return this.jobLocation.getRepeat(); + return this.jobLocation.getRepeat(); } @Override public long getStart() { - return this.jobLocation.getStartTime(); + return this.jobLocation.getStartTime(); } @Override @@ -76,13 +76,4 @@ public class JobImpl implements Job { return JobSupport.getDateTime(getStart()); } - @Override - public int getExecutionCount() { - return this.jobLocation.getRescheduledCount(); - } - - @Override - public String toString() { - return "Job: " + getJobId(); - } } http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java index cb66145..13cf376 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java @@ -36,8 +36,6 @@ class JobLocation { private long period; private String cronEntry; private final Location location; - private int rescheduledCount; - private Location lastUpdate; public JobLocation(Location location) { this.location = location; @@ -54,12 +52,8 @@ class JobLocation { this.delay = in.readLong(); this.nextTime = in.readLong(); this.period = in.readLong(); - this.cronEntry = in.readUTF(); + this.cronEntry=in.readUTF(); this.location.readExternal(in); - if (in.readBoolean()) { - this.lastUpdate = new Location(); - this.lastUpdate.readExternal(in); - } } public void writeExternal(DataOutput out) throws IOException { @@ -69,17 +63,11 @@ class JobLocation { out.writeLong(this.delay); out.writeLong(this.nextTime); out.writeLong(this.period); - if (this.cronEntry == null) { - this.cronEntry = ""; + if (this.cronEntry==null) { + this.cronEntry=""; } out.writeUTF(this.cronEntry); this.location.writeExternal(out); - if (lastUpdate != null) { - out.writeBoolean(true); - this.lastUpdate.writeExternal(out); - } else { - out.writeBoolean(false); - } } /** @@ -135,8 +123,7 @@ class JobLocation { } /** - * @param nextTime - * the nextTime to set + * @param nextTime the nextTime to set */ public synchronized void setNextTime(long nextTime) { this.nextTime = nextTime; @@ -165,8 +152,7 @@ class JobLocation { } /** - * @param cronEntry - * the cronEntry to set + * @param cronEntry the cronEntry to set */ public synchronized void setCronEntry(String cronEntry) { this.cronEntry = cronEntry; @@ -187,8 +173,7 @@ class JobLocation { } /** - * @param delay - * the delay to set + * @param delay the delay to set */ public void setDelay(long delay) { this.delay = delay; @@ -201,55 +186,15 @@ class JobLocation { return this.location; } - /** - * @returns the location in the journal of the last update issued for this - * Job. - */ - public Location getLastUpdate() { - return this.lastUpdate; - } - - /** - * Sets the location of the last update command written to the journal for - * this Job. The update commands set the next execution time for this job. - * We need to keep track of only the latest update as it's the only one we - * really need to recover the correct state from the journal. - * - * @param location - * The location in the journal of the last update command. - */ - public void setLastUpdate(Location location) { - this.lastUpdate = location; - } - - /** - * @return the number of time this job has been rescheduled. - */ - public int getRescheduledCount() { - return rescheduledCount; - } - - /** - * Sets the number of time this job has been rescheduled. A newly added job will return - * zero and increment this value each time a scheduled message is dispatched to its - * target destination and the job is rescheduled for another cycle. - * - * @param executionCount - * the new execution count to assign the JobLocation. - */ - public void setRescheduledCount(int rescheduledCount) { - this.rescheduledCount = rescheduledCount; - } - @Override public String toString() { - return "Job [id=" + jobId + ", startTime=" + new Date(startTime) + ", delay=" + delay + ", period=" + period + ", repeat=" + repeat + ", nextTime=" - + new Date(nextTime) + ", executionCount = " + (rescheduledCount + 1) + "]"; + return "Job [id=" + jobId + ", startTime=" + new Date(startTime) + + ", delay=" + delay + ", period=" + period + ", repeat=" + + repeat + ", nextTime=" + new Date(nextTime) + "]"; } static class JobLocationMarshaller extends VariableMarshaller<List<JobLocation>> { static final JobLocationMarshaller INSTANCE = new JobLocationMarshaller(); - @Override public List<JobLocation> readPayload(DataInput dataIn) throws IOException { List<JobLocation> result = new ArrayList<JobLocation>(); @@ -283,7 +228,6 @@ class JobLocation { result = prime * result + (int) (period ^ (period >>> 32)); result = prime * result + repeat; result = prime * result + (int) (startTime ^ (startTime >>> 32)); - result = prime * result + (rescheduledCount ^ (rescheduledCount >>> 32)); return result; } @@ -342,9 +286,6 @@ class JobLocation { if (startTime != other.startTime) { return false; } - if (rescheduledCount != other.rescheduledCount) { - return false; - } return true; }
