Author: rajdavies
Date: Thu Mar 1 11:24:17 2007
New Revision: 513455
URL: http://svn.apache.org/viewvc?view=rev&rev=513455
Log:
fix for jira issue: http://issues.apache.org/activemq/browse/AMQ-1121
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ContainerId.java
- copied, changed from r511881,
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerId.java
Removed:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerId.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/MapContainerTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/StoreTest.java
Copied:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ContainerId.java
(from r511881,
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerId.java)
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ContainerId.java?view=diff&rev=513455&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerId.java&r1=511881&p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ContainerId.java&r2=513455
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerId.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ContainerId.java
Thu Mar 1 11:24:17 2007
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.kaha.impl.container;
+package org.apache.activemq.kaha;
import java.io.Externalizable;
import java.io.IOException;
@@ -31,6 +31,15 @@
private Object key;
private String dataContainerName;
+ public ContainerId() {
+ }
+
+ public ContainerId(Object key,String dataContainerName) {
+ this.key=key;
+ this.dataContainerName=dataContainerName;
+ }
+
+
/**
* @return Returns the dataContainerPrefix.
*/
@@ -39,10 +48,10 @@
}
/**
- * @param dataContainerPrefix The dataContainerPrefix to set.
+ * @param dataContainerName The dataContainerPrefix to set.
*/
- public void setDataContainerName(String dataContainerPrefix){
- this.dataContainerName=dataContainerPrefix;
+ public void setDataContainerName(String dataContainerName){
+ this.dataContainerName=dataContainerName;
}
/**
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
Thu Mar 1 11:24:17 2007
@@ -138,6 +138,13 @@
* @throws IOException
*/
public void deleteMapContainer(Object id,String containerName) throws
IOException;
+
+ /**
+ * Delete Map container
+ * @param id
+ * @throws IOException
+ */
+ public void deleteMapContainer(ContainerId id) throws IOException;
/**
* Get a Set of call MapContainer Ids
@@ -145,7 +152,7 @@
* @return the set of ids
* @throws IOException
*/
- public Set getMapContainerIds() throws IOException;
+ public Set<ContainerId> getMapContainerIds() throws IOException;
/**
* Checks if a ListContainer exists in the default container
@@ -213,6 +220,12 @@
*/
public void deleteListContainer(Object id,String containerName) throws
IOException;
+ /**
+ * delete a list container
+ * @param id
+ * @throws IOException
+ */
+ public void deleteListContainer(ContainerId id) throws IOException;
/**
* Get a Set of call ListContainer Ids
@@ -220,7 +233,7 @@
* @return the set of ids
* @throws IOException
*/
- public Set getListContainerIds() throws IOException;
+ public Set<ContainerId> getListContainerIds() throws IOException;
/**
* @return the maxDataFileLength
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java
Thu Mar 1 11:24:17 2007
@@ -22,11 +22,11 @@
import java.util.Map;
import java.util.Set;
+import org.apache.activemq.kaha.ContainerId;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.kaha.StoreLocation;
-import org.apache.activemq.kaha.impl.container.ContainerId;
import org.apache.activemq.kaha.impl.data.Item;
import org.apache.activemq.kaha.impl.index.IndexItem;
import org.apache.activemq.kaha.impl.index.IndexManager;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
Thu Mar 1 11:24:17 2007
@@ -27,6 +27,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.activemq.kaha.ContainerId;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.RuntimeStoreException;
@@ -34,7 +35,6 @@
import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
import org.apache.activemq.kaha.impl.async.DataManagerFacade;
-import org.apache.activemq.kaha.impl.container.ContainerId;
import org.apache.activemq.kaha.impl.container.ListContainerImpl;
import org.apache.activemq.kaha.impl.container.MapContainerImpl;
import org.apache.activemq.kaha.impl.data.DataManagerImpl;
@@ -218,12 +218,14 @@
public void deleteMapContainer(Object id) throws IOException{
deleteMapContainer(id,DEFAULT_CONTAINER_NAME);
}
+
+ public void deleteMapContainer(Object id,String containerName) throws
IOException{
+ ContainerId containerId = new ContainerId(id,containerName);
+ deleteMapContainer(containerId);
+ }
- public synchronized void deleteMapContainer(Object id,String
containerName) throws IOException{
+ public synchronized void deleteMapContainer(ContainerId containerId)
throws IOException{
initialize();
- ContainerId containerId=new ContainerId();
- containerId.setKey(id);
- containerId.setDataContainerName(containerName);
MapContainerImpl container=maps.remove(containerId);
if(container!=null){
container.clear();
@@ -232,12 +234,12 @@
}
}
- public synchronized Set<Object> getMapContainerIds() throws IOException{
+ public synchronized Set<ContainerId> getMapContainerIds() throws
IOException{
initialize();
- Set<Object> set = new HashSet<Object>();
+ Set<ContainerId> set = new HashSet<ContainerId>();
for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {
ContainerId id = (ContainerId)i.next();
- set.add(id.getKey());
+ set.add(id);
}
return set;
}
@@ -286,12 +288,14 @@
public void deleteListContainer(Object id) throws IOException{
deleteListContainer(id,DEFAULT_CONTAINER_NAME);
}
-
+
public synchronized void deleteListContainer(Object id,String
containerName) throws IOException{
+ ContainerId containerId=new ContainerId(id,containerName);
+ deleteListContainer(containerId);
+ }
+
+ public synchronized void deleteListContainer(ContainerId containerId)
throws IOException{
initialize();
- ContainerId containerId=new ContainerId();
- containerId.setKey(id);
- containerId.setDataContainerName(containerName);
ListContainerImpl container=lists.remove(containerId);
if(container!=null){
listsContainer.removeRoot(container.getIndexManager(),containerId);
@@ -300,12 +304,12 @@
}
}
- public synchronized Set<Object> getListContainerIds() throws IOException{
+ public synchronized Set<ContainerId> getListContainerIds() throws
IOException{
initialize();
- Set<Object> set = new HashSet<Object>();
+ Set<ContainerId> set = new HashSet<ContainerId>();
for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
ContainerId id = (ContainerId)i.next();
- set.add(id.getKey());
+ set.add(id);
}
return set;
}
@@ -333,7 +337,7 @@
if( isUseAsyncDataManager() ) {
AsyncDataManager t=new AsyncDataManager();
t.setDirectory(directory);
- t.setFilePrefix("data-"+name+"-");
+ t.setFilePrefix("async-data-"+name+"-");
t.setMaxFileLength((int) maxDataFileLength);
t.start();
dm=new DataManagerFacade(t, name);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java
Thu Mar 1 11:24:17 2007
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.activemq.kaha.ContainerId;
import org.apache.activemq.kaha.RuntimeStoreException;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreEntry;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
Thu Mar 1 11:24:17 2007
@@ -21,6 +21,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
+import org.apache.activemq.kaha.ContainerId;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.RuntimeStoreException;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
Thu Mar 1 11:24:17 2007
@@ -20,6 +20,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
+import org.apache.activemq.kaha.ContainerId;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.RuntimeStoreException;
@@ -69,7 +70,7 @@
throw new RuntimeException(e);
}
}else{
- this.index=new VMIndex();
+ this.index=new VMIndex(indexManager);
}
}
index.setKeyMarshaller(keyMarshaller);
@@ -505,7 +506,7 @@
StoreLocation
data=dataManager.storeDataItem(valueMarshaller,value);
index.setValueData(data);
}
- IndexItem prev=indexList.getLast();
+ IndexItem prev=indexList.getLast();
prev=prev!=null?prev:indexList.getRoot();
IndexItem next=indexList.getNextEntry(prev);
prev.setNextItem(index.getOffset());
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java
Thu Mar 1 11:24:17 2007
@@ -34,6 +34,7 @@
private RandomAccessFile randomAcessFile;
private Object writerData;
long length=0;
+ private boolean dirty;
DataFile(File file,int number){
this.file=file;
@@ -107,6 +108,15 @@
*/
public synchronized void setWriterData(Object writerData) {
this.writerData = writerData;
+ dirty=true;
}
+
+ public synchronized boolean isDirty() {
+ return dirty;
+ }
+
+ public synchronized void setDirty(boolean value) {
+ this.dirty = value;
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java
Thu Mar 1 11:24:17 2007
@@ -57,7 +57,7 @@
Marshaller redoMarshaller = RedoStoreIndexItem.MARSHALLER;
private String dataFilePrefix;
-
+
public DataManagerImpl(File dir, final String name){
this.dir=dir;
this.name=name;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java
Thu Mar 1 11:24:17 2007
@@ -96,9 +96,10 @@
public synchronized void force(DataFile dataFile) throws IOException {
// If our dirty marker was set.. then we need to sync
- if( dataFile.getWriterData()!=null ) {
+ if( dataFile.getWriterData()!=null && dataFile.isDirty()) {
dataFile.getRandomAccessFile().getFD().sync();
dataFile.setWriterData(null);
+ dataFile.setDirty(false);
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java
Thu Mar 1 11:24:17 2007
@@ -65,6 +65,10 @@
public synchronized IndexItem getLast(){
if(size==0)
return null;
+ if(last!=null){
+ last.next=null;
+ last.setNextItem(IndexItem.POSITION_NOT_SET);
+ }
return last;
}
@@ -323,6 +327,7 @@
return;
if(e==last||e.equals(last)){
if(size>1){
+ last = (IndexItem)refreshEntry(last);
last=getPrevEntry(last);
}else{
last=null;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
Thu Mar 1 11:24:17 2007
@@ -45,6 +45,7 @@
private long length=0;
private IndexItem firstFree;
private IndexItem lastFree;
+ private boolean dirty;
public IndexManager(File directory,String name,String mode,DataManager
redoLog) throws IOException{
this.directory=directory;
@@ -76,10 +77,12 @@
lastFree.setNextItem(item.getOffset());
}
writer.updateIndexes(item);
+ dirty=true;
}
public synchronized void storeIndex(IndexItem index) throws IOException{
writer.storeItem(index);
+ dirty=true;
}
public synchronized void updateIndexes(IndexItem index) throws IOException{
@@ -88,10 +91,12 @@
}catch(Throwable e){
log.error(name+" error updating indexes ",e);
}
+ dirty=true;
}
public synchronized void redo(final RedoStoreIndexItem redo) throws
IOException{
writer.redoStoreItem(redo);
+ dirty=true;
}
public synchronized IndexItem createNewIndex() throws IOException{
@@ -113,8 +118,9 @@
}
public synchronized void force() throws IOException{
- if(indexFile!=null){
+ if(indexFile!=null && dirty){
indexFile.getFD().sync();
+ dirty=false;
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java
Thu Mar 1 11:24:17 2007
@@ -14,10 +14,14 @@
package org.apache.activemq.kaha.impl.index;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.StoreEntry;
+import org.apache.activemq.kaha.impl.container.MapContainerImpl;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* Index implementation using a HashMap
@@ -25,9 +29,13 @@
* @version $Revision: 1.2 $
*/
public class VMIndex implements Index{
-
+ private static final Log log=LogFactory.getLog(VMIndex.class);
+ private IndexManager indexManager;
private Map<Object,StoreEntry> map=new HashMap<Object,StoreEntry>();
+ public VMIndex(IndexManager manager) {
+ this.indexManager= manager;
+ }
/**
*
* @see org.apache.activemq.kaha.impl.index.Index#clear()
@@ -47,10 +55,20 @@
/**
* @param key
+ * @return store entry
* @see
org.apache.activemq.kaha.impl.index.Index#removeKey(java.lang.Object)
*/
public StoreEntry remove(Object key){
- return map.remove(key);
+ StoreEntry result = map.remove(key);
+ if (result != null) {
+ try{
+ result=indexManager.refreshIndex((IndexItem)result);
+ }catch(IOException e){
+ log.error("Failed to refresh entry",e);
+ throw new RuntimeException("Failed to refresh entry");
+ }
+ }
+ return result;
}
/**
@@ -68,7 +86,16 @@
* @return the entry
*/
public StoreEntry get(Object key){
- return map.get(key);
+ StoreEntry result = map.get(key);
+ if (result != null) {
+ try{
+ result=indexManager.refreshIndex((IndexItem)result);
+ }catch(IOException e){
+ log.error("Failed to refresh entry",e);
+ throw new RuntimeException("Failed to refresh entry");
+ }
+ }
+ return result;
}
/**
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
Thu Mar 1 11:24:17 2007
@@ -84,7 +84,7 @@
private final ConcurrentHashMap<ActiveMQTopic, AMQMessageStore> topics =
new ConcurrentHashMap<ActiveMQTopic, AMQMessageStore>();
private AsyncDataManager asyncDataManager;
- private ReferenceStoreAdapter referenceStoreAdapter;
+ private KahaReferenceStoreAdapter referenceStoreAdapter;
private TaskRunnerFactory taskRunnerFactory;
private WireFormat wireFormat = new OpenWireFormat();
@@ -106,7 +106,7 @@
private Runnable periodicCleanupTask;
private boolean deleteAllMessages;
- private File directory = new File(IOHelper.getDefaultDataDirectory() +
"/quick");
+ private File directory = new File(IOHelper.getDefaultDataDirectory() +
"/amq");
@@ -242,7 +242,9 @@
checkpointTask.wakeup();
if (sync) {
- log.debug("Waitng for checkpoint to complete.");
+ if(log.isDebugEnabled()){
+ log.debug("Waitng for checkpoint to complete.");
+ }
latch.await();
}
}
@@ -264,7 +266,10 @@
}
try {
- log.debug("Checkpoint started.");
+ if(log.isDebugEnabled()){
+ log.debug("Checkpoint started.");
+ }
+ referenceStoreAdapter.sync();
Location newMark = null;
Iterator<AMQMessageStore> iterator = queues.values().iterator();
@@ -287,7 +292,9 @@
try {
if (newMark != null) {
- log.debug("Marking journal at: " + newMark);
+ if(log.isDebugEnabled()){
+ log.debug("Marking journal at: " + newMark);
+ }
asyncDataManager.setMark(newMark, false);
writeTraceMessage("CHECKPOINT "+new Date(), true);
}
@@ -296,17 +303,12 @@
log.error("Failed to mark the Journal: " + e, e);
}
-// if (referenceStoreAdapter instanceof
JDBCReferenceStoreAdapter) {
-// // We may be check pointing more often than the
checkpointInterval if under high use
-// // But we don't want to clean up the db that often.
-// long now = System.currentTimeMillis();
-// if( now > lastCleanup+checkpointInterval ) {
-// lastCleanup = now;
-// ((JDBCReferenceStoreAdapter)
referenceStoreAdapter).cleanup();
-// }
-// }
-
- log.debug("Checkpoint done.");
+ if(log.isDebugEnabled()){
+ log.debug("Checkpoint done.");
+ }
+ }
+ catch(IOException e) {
+ log.error("Failed to sync reference store",e);
}
finally {
latch.countDown();
@@ -603,7 +605,7 @@
return manager;
}
- protected ReferenceStoreAdapter createReferenceStoreAdapter() throws
IOException {
+ protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws
IOException {
KahaReferenceStoreAdapter adaptor = new
KahaReferenceStoreAdapter(directory);
return adaptor;
}
@@ -627,9 +629,7 @@
public ReferenceStoreAdapter getReferenceStoreAdapter() {
return referenceStoreAdapter;
}
- public void setReferenceStoreAdapter(ReferenceStoreAdapter
referenceStoreAdapter) {
- this.referenceStoreAdapter = referenceStoreAdapter;
- }
+
public TaskRunnerFactory getTaskRunnerFactory() {
return taskRunnerFactory;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
Thu Mar 1 11:24:17 2007
@@ -31,9 +31,11 @@
protected final ActiveMQDestination destination;
protected final MapContainer<MessageId,ReferenceRecord> messageContainer;
+ protected KahaReferenceStoreAdapter adapter;
protected StoreEntry batchEntry=null;
- public KahaReferenceStore(MapContainer container,ActiveMQDestination
destination) throws IOException{
+ public KahaReferenceStore(KahaReferenceStoreAdapter adapter,MapContainer
container,ActiveMQDestination destination) throws IOException{
+ this.adapter = adapter;
this.messageContainer=container;
this.destination=destination;
}
@@ -109,10 +111,10 @@
return result.data;
}
- public void addReferenceFileIdsInUse(Set<Integer> rc){
+ public void addReferenceFileIdsInUse(){
for(StoreEntry
entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
ReferenceRecord
msg=(ReferenceRecord)messageContainer.getValue(entry);
- rc.add(msg.data.getFileId());
+ addInterest(msg);
}
}
@@ -172,10 +174,10 @@
}
void removeInterest(ReferenceRecord rr) {
-
+ adapter.removeInterestInRecordFile(rr.data.getFileId());
}
void addInterest(ReferenceRecord rr) {
-
+ adapter.addInterestInRecordFile(rr.data.getFileId());
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
Thu Mar 1 11:24:17 2007
@@ -17,34 +17,39 @@
*/
package org.apache.activemq.store.kahadaptor;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
-
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
+import org.apache.activemq.kaha.ContainerId;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
-import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.MessageIdMarshaller;
-import org.apache.activemq.kaha.MessageMarshaller;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.store.ReferenceStoreAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TopicReferenceStore;
-import org.apache.activemq.store.ReferenceStore.ReferenceData;
+import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter
implements ReferenceStoreAdapter {
-
- private MapContainer<Integer, Integer> fileReferences;
+ private static final Log log =
LogFactory.getLog(KahaPersistenceAdapter.class);
+ private static final String STORE_STATE = "store-state";
+ private static final String RECORD_REFERENCES = "record-references";
+ private MapContainer stateMap;
+ private Map<Integer,AtomicInteger>recordReferences = new
HashMap<Integer,AtomicInteger>();
+ private boolean storeValid;
public KahaReferenceStoreAdapter(File dir) throws IOException {
super(dir);
@@ -59,22 +64,63 @@
}
@Override
- public void start() throws Exception {
- super.start();
-
+ public void start() throws Exception{
+ super.start();
Store store=getStore();
- fileReferences=store.getMapContainer("file-references");
- fileReferences.setKeyMarshaller(new IntegerMarshaller());
- fileReferences.setValueMarshaller(new IntegerMarshaller());
- fileReferences.load();
+ boolean empty=store.getMapContainerIds().isEmpty();
+ stateMap=store.getMapContainer("state",STORE_STATE);
+ stateMap.load();
+ if(!empty){
+
+ AtomicBoolean status=(AtomicBoolean)stateMap.get(STORE_STATE);
+ if(status!=null){
+ storeValid=status.get();
+ }
+
+ if(storeValid){
+ if(stateMap.containsKey(RECORD_REFERENCES)){
+
recordReferences=(Map<Integer,AtomicInteger>)stateMap.get(RECORD_REFERENCES);
+ }
+ }else {
+ /*
+ log.warn("Store Not shutdown cleanly - clearing out unsafe
records ...");
+ Set<ContainerId> set = store.getListContainerIds();
+ for (ContainerId cid:set) {
+ if (!cid.getDataContainerName().equals(STORE_STATE)) {
+ store.deleteListContainer(cid);
+ }
+ }
+ set = store.getMapContainerIds();
+ for (ContainerId cid:set) {
+ if (!cid.getDataContainerName().equals(STORE_STATE)) {
+ store.deleteMapContainer(cid);
+ }
+ }
+ */
+ buildReferenceFileIdsInUse();
+ }
+
+ }
+ stateMap.put(STORE_STATE,new AtomicBoolean());
+ }
+
+ @Override
+ public void stop() throws Exception {
+ stateMap.put(RECORD_REFERENCES,recordReferences);
+ stateMap.put(STORE_STATE,new AtomicBoolean(true));
+ super.stop();
}
+ public boolean isStoreValid() {
+ return storeValid;
+ }
+
public ReferenceStore createQueueReferenceStore(ActiveMQQueue
destination) throws IOException {
ReferenceStore rc=(ReferenceStore)queues.get(destination);
if(rc==null){
- rc=new
KahaReferenceStore(getMapReferenceContainer(destination,"queue-data"),destination);
+ rc=new
KahaReferenceStore(this,getMapReferenceContainer(destination,"queue-data"),destination);
messageStores.put(destination,rc);
// if(transactionStore!=null){
// rc=transactionStore.proxy(rc);
@@ -89,10 +135,10 @@
if(rc==null){
Store store=getStore();
MapContainer
messageContainer=getMapReferenceContainer(destination,"topic-data");
- MapContainer
subsContainer=getSubsMapContainer(destination.toString()+"-Subscriptions","topic-subs");
+ MapContainer
subsContainer=getSubsMapContainer(destination.toString()+"-Subscriptions","blob");
ListContainer
ackContainer=store.getListContainer(destination.toString(),"topic-acks");
ackContainer.setMarshaller(new TopicSubAckMarshaller());
- rc=new
KahaTopicReferenceStore(store,messageContainer,ackContainer,subsContainer,destination);
+ rc=new
KahaTopicReferenceStore(store,this,messageContainer,ackContainer,subsContainer,destination);
messageStores.put(destination,rc);
// if(transactionStore!=null){
// rc=transactionStore.proxy(rc);
@@ -102,25 +148,26 @@
return rc;
}
- public Set<Integer> getReferenceFileIdsInUse() throws IOException {
+ public void buildReferenceFileIdsInUse() throws IOException {
- Set<Integer> rc = new HashSet<Integer>();
+ recordReferences = new HashMap<Integer,AtomicInteger>();
Set<ActiveMQDestination> destinations = getDestinations();
for (ActiveMQDestination destination : destinations) {
if( destination.isQueue() ) {
KahaReferenceStore store = (KahaReferenceStore)
createQueueReferenceStore((ActiveMQQueue) destination);
- store.addReferenceFileIdsInUse(rc);
+ store.addReferenceFileIdsInUse();
} else {
KahaTopicReferenceStore store =
(KahaTopicReferenceStore) createTopicReferenceStore((ActiveMQTopic)
destination);
- store.addReferenceFileIdsInUse(rc);
+ store.addReferenceFileIdsInUse();
}
- }
-
- return rc;
-
+ }
}
+ public void sync() throws IOException {
+ getStore().force();
+ }
+
protected MapContainer<MessageId,ReferenceRecord>
getMapReferenceContainer(Object id,String containerName) throws IOException{
Store store=getStore();
MapContainer<MessageId, ReferenceRecord>
container=store.getMapContainer(id,containerName);
@@ -128,6 +175,33 @@
container.setValueMarshaller(new ReferenceRecordMarshaller());
container.load();
return container;
+ }
+
+ synchronized void addInterestInRecordFile(int recordNumber) {
+ Integer key = new Integer(recordNumber);
+ AtomicInteger rr = recordReferences.get(key);
+ if (rr == null) {
+ rr = new AtomicInteger();
+ recordReferences.put(key,rr);
+ }
+ rr.incrementAndGet();
+ }
+
+ synchronized void removeInterestInRecordFile(int recordNumber) {
+ Integer key = new Integer(recordNumber);
+ AtomicInteger rr = recordReferences.get(key);
+ if (rr != null && rr.decrementAndGet() <= 0) {
+ recordReferences.remove(key);
+ }
+ }
+
+ /**
+ * @return
+ * @throws IOException
+ * @see
org.apache.activemq.store.ReferenceStoreAdapter#getReferenceFileIdsInUse()
+ */
+ public Set<Integer> getReferenceFileIdsInUse() throws IOException{
+ return recordReferences.keySet();
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
Thu Mar 1 11:24:17 2007
@@ -86,8 +86,12 @@
TopicSubAck
tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
if(tsa!=null){
if(tsa.decrementCount()<=0){
- ackContainer.remove(ref.getAckEntry());
- messageContainer.remove(tsa.getMessageEntry());
+ StoreEntry entry = ref.getAckEntry();
+ entry = ackContainer.refresh(entry);
+ ackContainer.remove(entry);
+ entry = tsa.getMessageEntry();
+ entry =messageContainer.refresh(entry);
+ messageContainer.remove(entry);
}else{
ackContainer.update(ref.getAckEntry(),tsa);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
Thu Mar 1 11:24:17 2007
@@ -39,9 +39,9 @@
private Store store;
protected Map subscriberMessages=new ConcurrentHashMap();
- public KahaTopicReferenceStore(Store store,MapContainer
messageContainer,ListContainer ackContainer,
+ public KahaTopicReferenceStore(Store store,KahaReferenceStoreAdapter
adapter,MapContainer messageContainer,ListContainer ackContainer,
MapContainer subsContainer,ActiveMQDestination destination) throws
IOException{
- super(messageContainer,destination);
+ super(adapter,messageContainer,destination);
this.store=store;
this.ackContainer=ackContainer;
subscriberContainer=subsContainer;
@@ -97,18 +97,18 @@
return result.data;
}
- public void addReferenceFileIdsInUse(Set<Integer> rc){
+ public void addReferenceFileIdsInUse(){
for(StoreEntry
entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
TopicSubAck subAck=(TopicSubAck)ackContainer.get(entry);
if(subAck.getCount()>0){
ReferenceRecord
rr=(ReferenceRecord)messageContainer.getValue(subAck.getMessageEntry());
- rc.add(rr.data.getFileId());
+ addInterest(rr);
}
}
}
protected ListContainer addSubscriberMessageContainer(Object key) throws
IOException{
- ListContainer container=store.getListContainer(key,"topic-subs");
+ ListContainer
container=store.getListContainer(key,"topic-subs-references");
Marshaller marshaller=new ConsumerMessageRefMarshaller();
container.setMarshaller(marshaller);
TopicSubContainer tsc=new TopicSubContainer(container);
@@ -129,11 +129,15 @@
TopicSubAck
tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
if(tsa!=null){
if(tsa.decrementCount()<=0){
- ackContainer.remove(ref.getAckEntry());
- ReferenceRecord rr = messageContainer.get(messageId);
- if (rr != null) {
- messageContainer.remove(tsa.getMessageEntry());
- removeInterest(rr);
+ StoreEntry entry=ref.getAckEntry();
+ entry=ackContainer.refresh(entry);
+ ackContainer.remove(entry);
+ ReferenceRecord rr=messageContainer.get(messageId);
+ if(rr!=null){
+ entry=tsa.getMessageEntry();
+ entry=messageContainer.refresh(entry);
+ messageContainer.remove(entry);
+ removeInterest(rr);
}
}else{
ackContainer.update(ref.getAckEntry(),tsa);
@@ -261,7 +265,7 @@
}
}
}
- store.deleteListContainer(key,"topic-subs");
+ store.deleteListContainer(key,"topic-subs-references");
}
protected String getSubscriptionKey(String clientId,String subscriberName){
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/MapContainerTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/MapContainerTest.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/MapContainerTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/MapContainerTest.java
Thu Mar 1 11:24:17 2007
@@ -37,10 +37,23 @@
protected MapContainer container;
protected Map testMap;
protected static final int COUNT = 10;
+
+ public void testBasicAllocations() throws Exception{
+ String key = "key";
+ Object value = testMap;
+ MapContainer test = store.getMapContainer("test","test");
+ test.put(key,value);
+ store.close();
+ store = getStore();
+ assertTrue(store.getMapContainerIds().isEmpty()==false);
+ test = store.getMapContainer("test","test");
+ assertEquals(value,test.get(key));
+
+ }
/*
* Test method for 'org.apache.activemq.kaha.MapContainer.size()'
*/
- public void XtestSize() throws Exception {
+ public void testSize() throws Exception {
container.putAll(testMap);
assertTrue(container.size()==testMap.size());
}
@@ -48,14 +61,14 @@
/*
* Test method for 'org.apache.activemq.kaha.MapContainer.isEmpty()'
*/
- public void XtestIsEmpty() throws Exception {
+ public void testIsEmpty() throws Exception {
assertTrue(container.isEmpty());
}
/*
* Test method for 'org.apache.activemq.kaha.MapContainer.clear()'
*/
- public void XtestClear() throws Exception {
+ public void testClear() throws Exception {
container.putAll(testMap);
assertTrue(container.size()==testMap.size());
container.clear();
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/StoreTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/StoreTest.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/StoreTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/StoreTest.java
Thu Mar 1 11:24:17 2007
@@ -100,16 +100,7 @@
assertFalse(store.doesMapContainerExist(containerId));
}
- /*
- * Test method for 'org.apache.activemq.kaha.Store.getMapContainerIds()'
- */
- public void testGetMapContainerIds()throws Exception {
- String containerId = "test";
- MapContainer container = store.getMapContainer(containerId);
- Set set = store.getMapContainerIds();
- assertTrue(set.contains(containerId));
- }
-
+
/*
@@ -139,16 +130,7 @@
assertFalse(store.doesListContainerExist(containerId));
}
- /*
- * Test method for 'org.apache.activemq.kaha.Store.getListContainerIds()'
- */
- public void testGetListContainerIds()throws Exception {
- String containerId = "test";
- ListContainer container = store.getListContainer(containerId);
- Set set = store.getListContainerIds();
- assertTrue(set.contains(containerId));
- }
-
+
public void testBasicAllocations() throws Exception{
Map testMap = new HashMap();
int count = 1000;