Author: rajdavies
Date: Thu Mar  1 11:24:17 2007
New Revision: 513455

URL: http://svn.apache.org/viewvc?view=rev&rev=513455
Log:
fix for jira issue: http://issues.apache.org/activemq/browse/AMQ-1121

Added:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ContainerId.java
      - copied, changed from r511881, 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerId.java
Removed:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerId.java
Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/MapContainerTest.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/StoreTest.java

Copied: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ContainerId.java
 (from r511881, 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerId.java)
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ContainerId.java?view=diff&rev=513455&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerId.java&r1=511881&p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ContainerId.java&r2=513455
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerId.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ContainerId.java
 Thu Mar  1 11:24:17 2007
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.kaha.impl.container;
+package org.apache.activemq.kaha;
 
 import java.io.Externalizable;
 import java.io.IOException;
@@ -31,6 +31,15 @@
     private Object key;
     private String dataContainerName;
 
+    public ContainerId() {
+    }
+    
+    public ContainerId(Object key,String dataContainerName) {
+        this.key=key;
+        this.dataContainerName=dataContainerName;
+    }
+    
+    
     /**
      * @return Returns the dataContainerPrefix.
      */
@@ -39,10 +48,10 @@
     }
 
     /**
-     * @param dataContainerPrefix The dataContainerPrefix to set.
+     * @param dataContainerName The dataContainerPrefix to set.
      */
-    public void setDataContainerName(String dataContainerPrefix){
-        this.dataContainerName=dataContainerPrefix;
+    public void setDataContainerName(String dataContainerName){
+        this.dataContainerName=dataContainerName;
     }
 
     /**

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java 
(original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java 
Thu Mar  1 11:24:17 2007
@@ -138,6 +138,13 @@
      * @throws IOException
      */
     public void deleteMapContainer(Object id,String containerName) throws 
IOException;
+    
+    /**
+     * Delete Map container
+     * @param id
+     * @throws IOException
+     */
+    public void deleteMapContainer(ContainerId id) throws IOException;
 
     /**
      * Get a Set of call MapContainer Ids
@@ -145,7 +152,7 @@
      * @return the set of ids
      * @throws IOException
      */
-    public Set getMapContainerIds() throws IOException;
+    public Set<ContainerId> getMapContainerIds() throws IOException;
 
     /**
      * Checks if a ListContainer exists in the default container
@@ -213,6 +220,12 @@
      */
     public void deleteListContainer(Object id,String containerName) throws 
IOException;
 
+    /**
+     * delete a list container
+     * @param id
+     * @throws IOException
+     */
+    public void deleteListContainer(ContainerId id) throws IOException;
 
     /**
      * Get a Set of call ListContainer Ids
@@ -220,7 +233,7 @@
      * @return the set of ids
      * @throws IOException
      */
-    public Set getListContainerIds() throws IOException;
+    public Set<ContainerId> getListContainerIds() throws IOException;
     
     /**
      * @return the maxDataFileLength

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java
 Thu Mar  1 11:24:17 2007
@@ -22,11 +22,11 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.activemq.kaha.ContainerId;
 import org.apache.activemq.kaha.Marshaller;
 import org.apache.activemq.kaha.Store;
 import org.apache.activemq.kaha.StoreEntry;
 import org.apache.activemq.kaha.StoreLocation;
-import org.apache.activemq.kaha.impl.container.ContainerId;
 import org.apache.activemq.kaha.impl.data.Item;
 import org.apache.activemq.kaha.impl.index.IndexItem;
 import org.apache.activemq.kaha.impl.index.IndexManager;

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
 Thu Mar  1 11:24:17 2007
@@ -27,6 +27,7 @@
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.activemq.kaha.ContainerId;
 import org.apache.activemq.kaha.ListContainer;
 import org.apache.activemq.kaha.MapContainer;
 import org.apache.activemq.kaha.RuntimeStoreException;
@@ -34,7 +35,6 @@
 import org.apache.activemq.kaha.StoreLocation;
 import org.apache.activemq.kaha.impl.async.AsyncDataManager;
 import org.apache.activemq.kaha.impl.async.DataManagerFacade;
-import org.apache.activemq.kaha.impl.container.ContainerId;
 import org.apache.activemq.kaha.impl.container.ListContainerImpl;
 import org.apache.activemq.kaha.impl.container.MapContainerImpl;
 import org.apache.activemq.kaha.impl.data.DataManagerImpl;
@@ -218,12 +218,14 @@
     public void deleteMapContainer(Object id) throws IOException{
         deleteMapContainer(id,DEFAULT_CONTAINER_NAME);
     }
+    
+    public void deleteMapContainer(Object id,String containerName) throws 
IOException{
+        ContainerId containerId = new ContainerId(id,containerName);
+        deleteMapContainer(containerId);
+    }
 
-    public synchronized void deleteMapContainer(Object id,String 
containerName) throws IOException{
+    public synchronized void deleteMapContainer(ContainerId containerId) 
throws IOException{
         initialize();
-        ContainerId containerId=new ContainerId();
-        containerId.setKey(id);
-        containerId.setDataContainerName(containerName);
         MapContainerImpl container=maps.remove(containerId);
         if(container!=null){
             container.clear();
@@ -232,12 +234,12 @@
         }
     }
 
-    public synchronized Set<Object> getMapContainerIds() throws IOException{
+    public synchronized Set<ContainerId> getMapContainerIds() throws 
IOException{
         initialize();
-        Set<Object> set = new HashSet<Object>();
+        Set<ContainerId> set = new HashSet<ContainerId>();
         for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {
             ContainerId id = (ContainerId)i.next();
-            set.add(id.getKey());
+            set.add(id);
         }
         return set;
     }
@@ -286,12 +288,14 @@
     public void deleteListContainer(Object id) throws IOException{
         deleteListContainer(id,DEFAULT_CONTAINER_NAME);
     }
-
+    
     public synchronized void deleteListContainer(Object id,String 
containerName) throws IOException{
+        ContainerId containerId=new ContainerId(id,containerName);
+        deleteListContainer(containerId);
+    }
+
+    public synchronized void deleteListContainer(ContainerId containerId) 
throws IOException{
         initialize();
-        ContainerId containerId=new ContainerId();
-        containerId.setKey(id);
-        containerId.setDataContainerName(containerName);
         ListContainerImpl container=lists.remove(containerId);
         if(container!=null){
             listsContainer.removeRoot(container.getIndexManager(),containerId);
@@ -300,12 +304,12 @@
         }
     }
 
-    public synchronized Set<Object> getListContainerIds() throws IOException{
+    public synchronized Set<ContainerId> getListContainerIds() throws 
IOException{
         initialize();
-        Set<Object> set = new HashSet<Object>();
+        Set<ContainerId> set = new HashSet<ContainerId>();
         for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
             ContainerId id = (ContainerId)i.next();
-            set.add(id.getKey());
+            set.add(id);
         }
         return set;
     }
@@ -333,7 +337,7 @@
                if( isUseAsyncDataManager() ) {
                        AsyncDataManager t=new AsyncDataManager();
                        t.setDirectory(directory);
-                       t.setFilePrefix("data-"+name+"-");
+                       t.setFilePrefix("async-data-"+name+"-");
                        t.setMaxFileLength((int) maxDataFileLength);
                        t.start();
                    dm=new DataManagerFacade(t, name);

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java
 Thu Mar  1 11:24:17 2007
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.activemq.kaha.ContainerId;
 import org.apache.activemq.kaha.RuntimeStoreException;
 import org.apache.activemq.kaha.Store;
 import org.apache.activemq.kaha.StoreEntry;

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
 Thu Mar  1 11:24:17 2007
@@ -21,6 +21,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.ListIterator;
+import org.apache.activemq.kaha.ContainerId;
 import org.apache.activemq.kaha.ListContainer;
 import org.apache.activemq.kaha.Marshaller;
 import org.apache.activemq.kaha.RuntimeStoreException;

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
 Thu Mar  1 11:24:17 2007
@@ -20,6 +20,7 @@
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+import org.apache.activemq.kaha.ContainerId;
 import org.apache.activemq.kaha.MapContainer;
 import org.apache.activemq.kaha.Marshaller;
 import org.apache.activemq.kaha.RuntimeStoreException;
@@ -69,7 +70,7 @@
                     throw new RuntimeException(e);
                 }
             }else{
-                this.index=new VMIndex();
+                this.index=new VMIndex(indexManager);
             }
         }
         index.setKeyMarshaller(keyMarshaller);
@@ -505,7 +506,7 @@
                 StoreLocation 
data=dataManager.storeDataItem(valueMarshaller,value);
                 index.setValueData(data);
             }
-            IndexItem prev=indexList.getLast();
+            IndexItem prev=indexList.getLast();           
             prev=prev!=null?prev:indexList.getRoot();
             IndexItem next=indexList.getNextEntry(prev);
             prev.setNextItem(index.getOffset());

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java
 Thu Mar  1 11:24:17 2007
@@ -34,6 +34,7 @@
     private RandomAccessFile randomAcessFile;
     private Object writerData;
     long length=0;
+    private boolean dirty;
 
     DataFile(File file,int number){
         this.file=file;
@@ -107,6 +108,15 @@
         */
        public synchronized void setWriterData(Object writerData) {
                this.writerData = writerData;
+        dirty=true;
        }
+    
+    public synchronized boolean isDirty() {
+        return dirty;
+    }
+    
+    public synchronized void setDirty(boolean value) {
+        this.dirty = value;
+    }
 
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java
 Thu Mar  1 11:24:17 2007
@@ -57,7 +57,7 @@
     
     Marshaller redoMarshaller = RedoStoreIndexItem.MARSHALLER;
     private String dataFilePrefix;
-
+   
     public DataManagerImpl(File dir, final String name){
         this.dir=dir;
         this.name=name;

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java
 Thu Mar  1 11:24:17 2007
@@ -96,9 +96,10 @@
 
        public synchronized void force(DataFile dataFile) throws IOException {
                // If our dirty marker was set.. then we need to sync
-               if( dataFile.getWriterData()!=null ) {
+               if( dataFile.getWriterData()!=null && dataFile.isDirty()) {
                        dataFile.getRandomAccessFile().getFD().sync();
                dataFile.setWriterData(null);
+            dataFile.setDirty(false);
                }
        }
 

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java
 Thu Mar  1 11:24:17 2007
@@ -65,6 +65,10 @@
     public synchronized IndexItem getLast(){
         if(size==0)
             return null;
+        if(last!=null){
+            last.next=null;
+            last.setNextItem(IndexItem.POSITION_NOT_SET);
+        }
         return last;
     }
 
@@ -323,6 +327,7 @@
             return;
         if(e==last||e.equals(last)){
             if(size>1){
+                last = (IndexItem)refreshEntry(last);
                 last=getPrevEntry(last);
             }else{
                 last=null;

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
 Thu Mar  1 11:24:17 2007
@@ -45,6 +45,7 @@
     private long length=0;
     private IndexItem firstFree;
     private IndexItem lastFree;
+    private boolean dirty;
 
     public IndexManager(File directory,String name,String mode,DataManager 
redoLog) throws IOException{
         this.directory=directory;
@@ -76,10 +77,12 @@
             lastFree.setNextItem(item.getOffset());
         }
         writer.updateIndexes(item);
+        dirty=true;
     }
 
     public synchronized void storeIndex(IndexItem index) throws IOException{
         writer.storeItem(index);
+        dirty=true;
     }
 
     public synchronized void updateIndexes(IndexItem index) throws IOException{
@@ -88,10 +91,12 @@
         }catch(Throwable e){
             log.error(name+" error updating indexes ",e);
         }
+        dirty=true;
     }
 
     public synchronized void redo(final RedoStoreIndexItem redo) throws 
IOException{
         writer.redoStoreItem(redo);
+        dirty=true;
     }
 
     public synchronized IndexItem createNewIndex() throws IOException{
@@ -113,8 +118,9 @@
     }
 
     public synchronized void force() throws IOException{
-        if(indexFile!=null){
+        if(indexFile!=null && dirty){
             indexFile.getFD().sync();
+            dirty=false;
         }
     }
 

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java
 Thu Mar  1 11:24:17 2007
@@ -14,10 +14,14 @@
 
 package org.apache.activemq.kaha.impl.index;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.activemq.kaha.Marshaller;
 import org.apache.activemq.kaha.StoreEntry;
+import org.apache.activemq.kaha.impl.container.MapContainerImpl;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * Index implementation using a HashMap
@@ -25,9 +29,13 @@
  * @version $Revision: 1.2 $
  */
 public class VMIndex implements Index{
-
+    private static final Log log=LogFactory.getLog(VMIndex.class);
+    private IndexManager indexManager;
     private Map<Object,StoreEntry> map=new HashMap<Object,StoreEntry>();
 
+    public VMIndex(IndexManager manager) {
+        this.indexManager= manager;
+    }
     /**
      * 
      * @see org.apache.activemq.kaha.impl.index.Index#clear()
@@ -47,10 +55,20 @@
 
     /**
      * @param key
+     * @return store entry
      * @see 
org.apache.activemq.kaha.impl.index.Index#removeKey(java.lang.Object)
      */
     public StoreEntry remove(Object key){
-       return  map.remove(key);
+        StoreEntry result =   map.remove(key);
+        if (result != null) {
+            try{
+                result=indexManager.refreshIndex((IndexItem)result);
+            }catch(IOException e){
+                log.error("Failed to refresh entry",e);
+               throw new RuntimeException("Failed to refresh entry");
+            }
+        }
+        return result;
     }
 
     /**
@@ -68,7 +86,16 @@
      * @return the entry
      */
     public StoreEntry get(Object key){
-        return map.get(key);
+        StoreEntry result =  map.get(key);
+        if (result != null) {
+            try{
+                result=indexManager.refreshIndex((IndexItem)result);
+            }catch(IOException e){
+                log.error("Failed to refresh entry",e);
+               throw new RuntimeException("Failed to refresh entry");
+            }
+        }
+        return result;
     }
 
     /**

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
 Thu Mar  1 11:24:17 2007
@@ -84,7 +84,7 @@
     private final ConcurrentHashMap<ActiveMQTopic, AMQMessageStore> topics = 
new ConcurrentHashMap<ActiveMQTopic, AMQMessageStore>();
     
     private AsyncDataManager asyncDataManager;
-    private ReferenceStoreAdapter referenceStoreAdapter;
+    private KahaReferenceStoreAdapter referenceStoreAdapter;
        private TaskRunnerFactory taskRunnerFactory; 
     private WireFormat wireFormat = new OpenWireFormat();
 
@@ -106,7 +106,7 @@
 
        private Runnable periodicCleanupTask;
        private boolean deleteAllMessages;
-       private File directory = new File(IOHelper.getDefaultDataDirectory() + 
"/quick");
+       private File directory = new File(IOHelper.getDefaultDataDirectory() + 
"/amq");
 
 
     
@@ -242,7 +242,9 @@
             checkpointTask.wakeup();
             
             if (sync) {
-                log.debug("Waitng for checkpoint to complete.");
+                if(log.isDebugEnabled()){
+                    log.debug("Waitng for checkpoint to complete.");
+                }
                 latch.await();
             }
         }
@@ -264,7 +266,10 @@
         }        
         try {
 
-            log.debug("Checkpoint started.");
+            if(log.isDebugEnabled()){
+                log.debug("Checkpoint started.");
+            }
+            referenceStoreAdapter.sync();
             Location newMark = null;
 
             Iterator<AMQMessageStore> iterator = queues.values().iterator();
@@ -287,7 +292,9 @@
 
             try {
                 if (newMark != null) {
-                    log.debug("Marking journal at: " + newMark);
+                    if(log.isDebugEnabled()){
+                        log.debug("Marking journal at: " + newMark);
+                    }
                     asyncDataManager.setMark(newMark, false);
                     writeTraceMessage("CHECKPOINT "+new Date(), true);
                 }
@@ -296,17 +303,12 @@
                 log.error("Failed to mark the Journal: " + e, e);
             }
     
-//                if (referenceStoreAdapter instanceof 
JDBCReferenceStoreAdapter) {
-//                    // We may be check pointing more often than the 
checkpointInterval if under high use
-//                    // But we don't want to clean up the db that often.
-//                    long now = System.currentTimeMillis();
-//                    if( now > lastCleanup+checkpointInterval ) {
-//                        lastCleanup = now;
-//                        ((JDBCReferenceStoreAdapter) 
referenceStoreAdapter).cleanup();
-//                    }
-//                }
-
-            log.debug("Checkpoint done.");
+            if(log.isDebugEnabled()){
+                log.debug("Checkpoint done.");
+            }
+        }
+        catch(IOException e) {
+            log.error("Failed to sync reference store",e);
         }
         finally {
             latch.countDown();
@@ -603,7 +605,7 @@
                return manager;
        }
     
-    protected ReferenceStoreAdapter createReferenceStoreAdapter() throws 
IOException {
+    protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws 
IOException {
        KahaReferenceStoreAdapter adaptor = new 
KahaReferenceStoreAdapter(directory); 
                return adaptor;
        }
@@ -627,9 +629,7 @@
        public ReferenceStoreAdapter getReferenceStoreAdapter() {
                return referenceStoreAdapter;
        }
-       public void setReferenceStoreAdapter(ReferenceStoreAdapter 
referenceStoreAdapter) {
-               this.referenceStoreAdapter = referenceStoreAdapter;
-       }
+       
 
        public TaskRunnerFactory getTaskRunnerFactory() {
                return taskRunnerFactory;

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
 Thu Mar  1 11:24:17 2007
@@ -31,9 +31,11 @@
 
     protected final ActiveMQDestination destination;
     protected final MapContainer<MessageId,ReferenceRecord> messageContainer;
+    protected KahaReferenceStoreAdapter adapter;
     protected StoreEntry batchEntry=null;
 
-    public KahaReferenceStore(MapContainer container,ActiveMQDestination 
destination) throws IOException{
+    public KahaReferenceStore(KahaReferenceStoreAdapter adapter,MapContainer 
container,ActiveMQDestination destination) throws IOException{
+        this.adapter = adapter;
         this.messageContainer=container;
         this.destination=destination;
     }
@@ -109,10 +111,10 @@
         return result.data;
     }
 
-    public void addReferenceFileIdsInUse(Set<Integer> rc){
+    public void addReferenceFileIdsInUse(){
         for(StoreEntry 
entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
             ReferenceRecord 
msg=(ReferenceRecord)messageContainer.getValue(entry);
-            rc.add(msg.data.getFileId());
+            addInterest(msg);
         }
     }
 
@@ -172,10 +174,10 @@
     }
     
     void removeInterest(ReferenceRecord rr) {
-        
+        adapter.removeInterestInRecordFile(rr.data.getFileId());
     }
     
     void addInterest(ReferenceRecord rr) {
-        
+        adapter.addInterestInRecordFile(rr.data.getFileId());
     }
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
 Thu Mar  1 11:24:17 2007
@@ -17,34 +17,39 @@
  */
 package org.apache.activemq.store.kahadaptor;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
-
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.kaha.ContainerId;
 import org.apache.activemq.kaha.ListContainer;
 import org.apache.activemq.kaha.MapContainer;
-import org.apache.activemq.kaha.Marshaller;
 import org.apache.activemq.kaha.MessageIdMarshaller;
-import org.apache.activemq.kaha.MessageMarshaller;
 import org.apache.activemq.kaha.Store;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.ReferenceStore;
 import org.apache.activemq.store.ReferenceStoreAdapter;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TopicReferenceStore;
-import org.apache.activemq.store.ReferenceStore.ReferenceData;
+import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter 
implements ReferenceStoreAdapter {
-
-       private MapContainer<Integer, Integer> fileReferences;
+    private static final Log log = 
LogFactory.getLog(KahaPersistenceAdapter.class);
+   private static final String STORE_STATE = "store-state";
+   private static final String RECORD_REFERENCES = "record-references";
+    private MapContainer stateMap;
+       private Map<Integer,AtomicInteger>recordReferences = new 
HashMap<Integer,AtomicInteger>();
+    private boolean storeValid;
 
        public KahaReferenceStoreAdapter(File dir) throws IOException {
                super(dir);
@@ -59,22 +64,63 @@
     }
     
     @Override
-    public void start() throws Exception {
-       super.start();
-       
+    public void start() throws Exception{
+        super.start();
         Store store=getStore();
-        fileReferences=store.getMapContainer("file-references");
-        fileReferences.setKeyMarshaller(new IntegerMarshaller());
-        fileReferences.setValueMarshaller(new IntegerMarshaller());
-        fileReferences.load();        
+        boolean empty=store.getMapContainerIds().isEmpty();
+        stateMap=store.getMapContainer("state",STORE_STATE);
+        stateMap.load();
+        if(!empty){
+            
+            AtomicBoolean status=(AtomicBoolean)stateMap.get(STORE_STATE);
+            if(status!=null){
+                storeValid=status.get();
+            }
+           
+            if(storeValid){
+                if(stateMap.containsKey(RECORD_REFERENCES)){
+                    
recordReferences=(Map<Integer,AtomicInteger>)stateMap.get(RECORD_REFERENCES);
+                }
+            }else {
+                /*
+                log.warn("Store Not shutdown cleanly - clearing out unsafe 
records ...");
+                Set<ContainerId> set = store.getListContainerIds();
+                for (ContainerId cid:set) {
+                    if (!cid.getDataContainerName().equals(STORE_STATE)) {
+                        store.deleteListContainer(cid);
+                    }
+                }
+                set = store.getMapContainerIds();
+                for (ContainerId cid:set) {
+                    if (!cid.getDataContainerName().equals(STORE_STATE)) {
+                        store.deleteMapContainer(cid);
+                    }
+                }
+                */
+                buildReferenceFileIdsInUse();
+            }
+            
+        }
+        stateMap.put(STORE_STATE,new AtomicBoolean());
+    }
+    
+    @Override
+    public void stop() throws Exception {
+        stateMap.put(RECORD_REFERENCES,recordReferences);
+        stateMap.put(STORE_STATE,new AtomicBoolean(true));
+        super.stop();        
     }
     
     
+    public boolean isStoreValid() {
+        return storeValid;
+    }
+    
 
        public ReferenceStore createQueueReferenceStore(ActiveMQQueue 
destination) throws IOException {
                ReferenceStore rc=(ReferenceStore)queues.get(destination);
         if(rc==null){
-            rc=new 
KahaReferenceStore(getMapReferenceContainer(destination,"queue-data"),destination);
+            rc=new 
KahaReferenceStore(this,getMapReferenceContainer(destination,"queue-data"),destination);
             messageStores.put(destination,rc);
 //            if(transactionStore!=null){
 //                rc=transactionStore.proxy(rc);
@@ -89,10 +135,10 @@
         if(rc==null){
             Store store=getStore();
             MapContainer 
messageContainer=getMapReferenceContainer(destination,"topic-data");
-            MapContainer 
subsContainer=getSubsMapContainer(destination.toString()+"-Subscriptions","topic-subs");
+            MapContainer 
subsContainer=getSubsMapContainer(destination.toString()+"-Subscriptions","blob");
             ListContainer 
ackContainer=store.getListContainer(destination.toString(),"topic-acks");
             ackContainer.setMarshaller(new TopicSubAckMarshaller());
-            rc=new 
KahaTopicReferenceStore(store,messageContainer,ackContainer,subsContainer,destination);
+            rc=new 
KahaTopicReferenceStore(store,this,messageContainer,ackContainer,subsContainer,destination);
             messageStores.put(destination,rc);
 //            if(transactionStore!=null){
 //                rc=transactionStore.proxy(rc);
@@ -102,25 +148,26 @@
         return rc;
        }
 
-       public Set<Integer> getReferenceFileIdsInUse() throws IOException {
+       public void buildReferenceFileIdsInUse() throws IOException {
                
-               Set<Integer> rc = new HashSet<Integer>();
+        recordReferences = new HashMap<Integer,AtomicInteger>();
                
                Set<ActiveMQDestination> destinations = getDestinations();
                for (ActiveMQDestination destination : destinations) {
                        if( destination.isQueue() ) {
                                KahaReferenceStore store = (KahaReferenceStore) 
createQueueReferenceStore((ActiveMQQueue) destination);
-                               store.addReferenceFileIdsInUse(rc);
+                               store.addReferenceFileIdsInUse();
                        } else {
                                KahaTopicReferenceStore store = 
(KahaTopicReferenceStore) createTopicReferenceStore((ActiveMQTopic) 
destination);
-                               store.addReferenceFileIdsInUse(rc);
+                               store.addReferenceFileIdsInUse();
                        }
-               }
-               
-               return rc;
-               
+        }              
        }
     
+    public void sync() throws IOException {
+        getStore().force();
+    }
+    
     protected MapContainer<MessageId,ReferenceRecord> 
getMapReferenceContainer(Object id,String containerName) throws IOException{
         Store store=getStore();
         MapContainer<MessageId, ReferenceRecord> 
container=store.getMapContainer(id,containerName);
@@ -128,6 +175,33 @@
         container.setValueMarshaller(new ReferenceRecordMarshaller());        
         container.load();
         return container;
+    }
+    
+    synchronized void addInterestInRecordFile(int recordNumber) {
+        Integer key = new Integer(recordNumber);
+        AtomicInteger rr = recordReferences.get(key);
+        if (rr == null) {
+            rr = new AtomicInteger();
+            recordReferences.put(key,rr);
+        }
+        rr.incrementAndGet();
+    }
+    
+    synchronized void removeInterestInRecordFile(int recordNumber) {
+        Integer key = new Integer(recordNumber);
+        AtomicInteger rr = recordReferences.get(key);
+        if (rr != null && rr.decrementAndGet() <= 0) {
+            recordReferences.remove(key);
+        }
+    }
+
+    /**
+     * @return
+     * @throws IOException
+     * @see 
org.apache.activemq.store.ReferenceStoreAdapter#getReferenceFileIdsInUse()
+     */
+    public Set<Integer> getReferenceFileIdsInUse() throws IOException{
+        return recordReferences.keySet();
     }
 
     

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
 Thu Mar  1 11:24:17 2007
@@ -86,8 +86,12 @@
                 TopicSubAck 
tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
                 if(tsa!=null){
                     if(tsa.decrementCount()<=0){
-                        ackContainer.remove(ref.getAckEntry());
-                        messageContainer.remove(tsa.getMessageEntry());
+                        StoreEntry entry = ref.getAckEntry();
+                        entry = ackContainer.refresh(entry);
+                        ackContainer.remove(entry);
+                        entry = tsa.getMessageEntry();
+                        entry =messageContainer.refresh(entry);
+                        messageContainer.remove(entry);
                     }else{
                         ackContainer.update(ref.getAckEntry(),tsa);
                     }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
 Thu Mar  1 11:24:17 2007
@@ -39,9 +39,9 @@
     private Store store;
     protected Map subscriberMessages=new ConcurrentHashMap();
 
-    public KahaTopicReferenceStore(Store store,MapContainer 
messageContainer,ListContainer ackContainer,
+    public KahaTopicReferenceStore(Store store,KahaReferenceStoreAdapter 
adapter,MapContainer messageContainer,ListContainer ackContainer,
             MapContainer subsContainer,ActiveMQDestination destination) throws 
IOException{
-        super(messageContainer,destination);
+        super(adapter,messageContainer,destination);
         this.store=store;
         this.ackContainer=ackContainer;
         subscriberContainer=subsContainer;
@@ -97,18 +97,18 @@
         return result.data;
     }
 
-    public void addReferenceFileIdsInUse(Set<Integer> rc){
+    public void addReferenceFileIdsInUse(){
         for(StoreEntry 
entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
             TopicSubAck subAck=(TopicSubAck)ackContainer.get(entry);
             if(subAck.getCount()>0){
                 ReferenceRecord 
rr=(ReferenceRecord)messageContainer.getValue(subAck.getMessageEntry());
-                rc.add(rr.data.getFileId());
+                addInterest(rr);
             }
         }
     }
 
     protected ListContainer addSubscriberMessageContainer(Object key) throws 
IOException{
-        ListContainer container=store.getListContainer(key,"topic-subs");
+        ListContainer 
container=store.getListContainer(key,"topic-subs-references");
         Marshaller marshaller=new ConsumerMessageRefMarshaller();
         container.setMarshaller(marshaller);
         TopicSubContainer tsc=new TopicSubContainer(container);
@@ -129,11 +129,15 @@
                 TopicSubAck 
tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
                 if(tsa!=null){
                     if(tsa.decrementCount()<=0){
-                        ackContainer.remove(ref.getAckEntry());
-                        ReferenceRecord rr = messageContainer.get(messageId);
-                        if (rr != null) {
-                        messageContainer.remove(tsa.getMessageEntry());
-                        removeInterest(rr);
+                        StoreEntry entry=ref.getAckEntry();
+                        entry=ackContainer.refresh(entry);
+                        ackContainer.remove(entry);
+                        ReferenceRecord rr=messageContainer.get(messageId);
+                        if(rr!=null){
+                            entry=tsa.getMessageEntry();
+                            entry=messageContainer.refresh(entry);
+                            messageContainer.remove(entry);
+                            removeInterest(rr);
                         }
                     }else{
                         ackContainer.update(ref.getAckEntry(),tsa);
@@ -261,7 +265,7 @@
                 }
             }
         }
-        store.deleteListContainer(key,"topic-subs");
+        store.deleteListContainer(key,"topic-subs-references");
     }
 
     protected String getSubscriptionKey(String clientId,String subscriberName){

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/MapContainerTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/MapContainerTest.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/MapContainerTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/MapContainerTest.java
 Thu Mar  1 11:24:17 2007
@@ -37,10 +37,23 @@
     protected MapContainer container;
     protected Map testMap;
     protected static final int COUNT = 10;
+    
+    public void testBasicAllocations() throws Exception{
+        String key = "key";
+        Object value = testMap;
+        MapContainer test = store.getMapContainer("test","test");
+        test.put(key,value);
+        store.close();
+        store = getStore();
+        assertTrue(store.getMapContainerIds().isEmpty()==false);
+        test = store.getMapContainer("test","test");
+        assertEquals(value,test.get(key));
+        
+    }
     /*
      * Test method for 'org.apache.activemq.kaha.MapContainer.size()'
      */
-    public void XtestSize() throws Exception {
+    public void testSize() throws Exception {
         container.putAll(testMap);
         assertTrue(container.size()==testMap.size());
     }
@@ -48,14 +61,14 @@
     /*
      * Test method for 'org.apache.activemq.kaha.MapContainer.isEmpty()'
      */
-    public void XtestIsEmpty() throws Exception {
+    public void testIsEmpty() throws Exception {
        assertTrue(container.isEmpty());
     }
 
     /*
      * Test method for 'org.apache.activemq.kaha.MapContainer.clear()'
      */
-    public void XtestClear() throws Exception {
+    public void testClear() throws Exception {
         container.putAll(testMap);
         assertTrue(container.size()==testMap.size());
         container.clear();

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/StoreTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/StoreTest.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/StoreTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/StoreTest.java
 Thu Mar  1 11:24:17 2007
@@ -100,16 +100,7 @@
         assertFalse(store.doesMapContainerExist(containerId));
     }
 
-    /*
-     * Test method for 'org.apache.activemq.kaha.Store.getMapContainerIds()'
-     */
-    public void testGetMapContainerIds()throws Exception {
-        String containerId = "test";
-        MapContainer container = store.getMapContainer(containerId);
-        Set set = store.getMapContainerIds();
-        assertTrue(set.contains(containerId));
-    }
-
+    
     
 
     /*
@@ -139,16 +130,7 @@
         assertFalse(store.doesListContainerExist(containerId));
     }
 
-    /*
-     * Test method for 'org.apache.activemq.kaha.Store.getListContainerIds()'
-     */
-    public void testGetListContainerIds()throws Exception {
-        String containerId = "test";
-        ListContainer container = store.getListContainer(containerId);
-        Set set = store.getListContainerIds();
-        assertTrue(set.contains(containerId));
-    }
-    
+        
     public void testBasicAllocations() throws Exception{
         Map testMap = new HashMap();
         int count = 1000;


Reply via email to