Author: rajdavies
Date: Thu Apr 20 13:07:32 2006
New Revision: 395689
URL: http://svn.apache.org/viewcvs?rev=395689&view=rev
Log:
more tuning - and ensure data files have more meaningful names
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerId.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/BaseContainerImpl.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java?rev=395689&r1=395688&r2=395689&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
Thu Apr 20 13:07:32 2006
@@ -2,36 +2,36 @@
import java.io.IOException;
import java.util.Set;
-
/**
-* A Store is holds persistent containers
-*
-* @version $Revision: 1.2 $
-*/
+ * A Store is holds persistent containers
+ *
+ * @version $Revision: 1.2 $
+ */
public interface Store{
-
/**
* close the store
+ *
* @throws IOException
*/
public void close() throws IOException;
-
-
+
/**
* Force all writes to disk
+ *
* @throws IOException
*/
public void force() throws IOException;
-
+
/**
* empty all the contents of the store
+ *
* @throws IOException
*/
public void clear() throws IOException;
-
-
+
/**
* delete the store
+ *
* @return true if the delete was successful
* @throws IOException
*/
@@ -39,22 +39,35 @@
/**
* Checks if a MapContainer exists
+ *
* @param id
* @return new MapContainer
- * @throws IOException
+ * @throws IOException
*/
public boolean doesMapContainerExist(Object id) throws IOException;
/**
* Get a MapContainer with the given id - the MapContainer is created if
needed
+ *
* @param id
* @return container for the associated id or null if it doesn't exist
- * @throws IOException
+ * @throws IOException
*/
- public MapContainer getMapContainer(Object id) throws IOException;
+ public MapContainer getMapContainer(Object id) throws IOException;
+
+ /**
+ * Get a MapContainer with the given id - the MapContainer is created if
needed
+ *
+ * @param id
+ * @param containerName
+ * @return container for the associated id or null if it doesn't exist
+ * @throws IOException
+ */
+ public MapContainer getMapContainer(Object id,String containerName) throws
IOException;
/**
* delete a container
+ *
* @param id
* @throws IOException
*/
@@ -62,40 +75,53 @@
/**
* Get a Set of call MapContainer Ids
+ *
* @return the set of ids
- * @throws IOException
+ * @throws IOException
*/
public Set getMapContainerIds() throws IOException;
-
+
/**
* Checks if a ListContainer exists
+ *
* @param id
* @return new MapContainer
- * @throws IOException
+ * @throws IOException
*/
public boolean doesListContainerExist(Object id) throws IOException;
- /**
- * Get a ListContainer with the given id and creates it if it doesn't exist
- * @param id
- * @return container for the associated id or null if it doesn't exist
- * @throws IOException
- */
- public ListContainer getListContainer(Object id) throws IOException;
-
- /**
- * delete a ListContainer
- * @param id
- * @throws IOException
- */
- public void deleteListContainer(Object id) throws IOException;
-
- /**
- * Get a Set of call ListContainer Ids
- * @return the set of ids
- * @throws IOException
- */
- public Set getListContainerIds() throws IOException;
-
-
+ /**
+ * Get a ListContainer with the given id and creates it if it doesn't exist
+ *
+ * @param id
+ * @return container for the associated id or null if it doesn't exist
+ * @throws IOException
+ */
+ public ListContainer getListContainer(Object id) throws IOException;
+
+ /**
+ * Get a ListContainer with the given id and creates it if it doesn't exist
+ *
+ * @param id
+ * @param containerName
+ * @return container for the associated id or null if it doesn't exist
+ * @throws IOException
+ */
+ public ListContainer getListContainer(Object id,String containerName)
throws IOException;
+
+ /**
+ * delete a ListContainer
+ *
+ * @param id
+ * @throws IOException
+ */
+ public void deleteListContainer(Object id) throws IOException;
+
+ /**
+ * Get a Set of call ListContainer Ids
+ *
+ * @return the set of ids
+ * @throws IOException
+ */
+ public Set getListContainerIds() throws IOException;
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/BaseContainerImpl.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/BaseContainerImpl.java?rev=395689&r1=395688&r2=395689&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/BaseContainerImpl.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/BaseContainerImpl.java
Thu Apr 20 13:07:32 2006
@@ -30,18 +30,22 @@
protected IndexLinkedList list;
protected IndexManager indexManager;
protected DataManager dataManager;
- protected Object id;
+ protected ContainerId containerId;
protected boolean loaded=false;
protected boolean closed=false;
protected final Object mutex=new Object();
- protected BaseContainerImpl(Object id,IndexItem root,IndexManager
indexManager,DataManager dataManager){
- this.id=id;
+ protected BaseContainerImpl(ContainerId id,IndexItem root,IndexManager
indexManager,DataManager dataManager){
+ this.containerId=id;
this.root=root;
this.indexManager=indexManager;
this.dataManager=dataManager;
this.list=new IndexLinkedList(root);
}
+
+ ContainerId getContainerId(){
+ return containerId;
+ }
public abstract void unload();
@@ -81,7 +85,7 @@
*/
public final Object getId(){
checkClosed();
- return id;
+ return containerId.getKey();
}
protected final void expressDataInterest() throws IOException{
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerId.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerId.java?rev=395689&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerId.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerId.java
Thu Apr 20 13:07:32 2006
@@ -0,0 +1,80 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.kaha.impl;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+/**
+ * Used by RootContainers
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+public class ContainerId implements Externalizable{
+ private static final long serialVersionUID=-8883779541021821943L;
+ private Object key;
+ private String dataContainerPrefix;
+
+ /**
+ * @return Returns the dataContainerPrefix.
+ */
+ public String getDataContainerPrefix(){
+ return dataContainerPrefix;
+ }
+
+ /**
+ * @param dataContainerPrefix The dataContainerPrefix to set.
+ */
+ public void setDataContainerPrefix(String dataContainerPrefix){
+ this.dataContainerPrefix=dataContainerPrefix;
+ }
+
+ /**
+ * @return Returns the key.
+ */
+ public Object getKey(){
+ return key;
+ }
+
+ /**
+ * @param key The key to set.
+ */
+ public void setKey(Object key){
+ this.key=key;
+ }
+
+ public int hashCode(){
+ return key.hashCode();
+ }
+
+ public boolean equals(Object obj){
+ boolean result = false;
+ if (obj != null && obj instanceof ContainerId){
+ ContainerId other = (ContainerId) obj;
+ result = other.key.equals(this.key);
+ }
+ return result;
+ }
+
+ public void writeExternal(ObjectOutput out) throws IOException{
+ out.writeUTF(getDataContainerPrefix());
+ out.writeObject(key);
+ }
+
+ public void readExternal(ObjectInput in) throws
IOException,ClassNotFoundException{
+ dataContainerPrefix=in.readUTF();
+ key=in.readObject();
+ }
+}
\ No newline at end of file
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java?rev=395689&r1=395688&r2=395689&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java
Thu Apr 20 13:07:32 2006
@@ -65,6 +65,10 @@
}
}
}
+
+ public String getPrefix(){
+ return prefix;
+ }
DataFile findSpaceForData(DataItem item) throws IOException{
if(currentWriteFile==null||((currentWriteFile.getLength()+item.getSize())>MAX_FILE_LENGTH)){
@@ -168,7 +172,7 @@
DataFile dataFile=(DataFile) purgeList.get(i);
fileMap.remove(dataFile.getNumber());
boolean result=dataFile.delete();
- log.info("discarding data file "+dataFile+(result?"successful
":"failed"));
+ log.debug("discarding data file "+dataFile+(result?"successful
":"failed"));
}
}
@@ -183,6 +187,6 @@
private void removeDataFile(DataFile dataFile) throws IOException{
fileMap.remove(dataFile.getNumber());
boolean result=dataFile.delete();
- log.info("discarding data file "+dataFile+(result?"successful
":"failed"));
+ log.debug("discarding data file "+dataFile+(result?"successful
":"failed"));
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java?rev=395689&r1=395688&r2=395689&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
Thu Apr 20 13:07:32 2006
@@ -34,13 +34,16 @@
* @version $Revision: 1.1.1.1 $
*/
public class KahaStore implements Store{
- DataManager rootData;
- DataManager containersData;
- IndexManager indexManager;
+ private static final String DEFAULT_CONTAINER_NAME = "data-container";
+ private File directory;
+ private DataManager rootData;
+ private DataManager defaultContainerManager;
+ private IndexManager indexManager;
private IndexRootContainer mapsContainer;
private IndexRootContainer listsContainer;
private Map lists=new ConcurrentHashMap();
private Map maps=new ConcurrentHashMap();
+ private Map dataManagers = new ConcurrentHashMap();
private boolean closed=false;
private String name;
private String mode;
@@ -58,7 +61,7 @@
if(initialized){
indexManager.close();
rootData.close();
- containersData.close();
+ defaultContainerManager.close();
}
}
}
@@ -67,7 +70,7 @@
if(initialized){
indexManager.force();
rootData.force();
- containersData.force();
+ defaultContainerManager.force();
}
}
@@ -90,7 +93,7 @@
clear();
boolean result=indexManager.delete();
result&=rootData.delete();
- result&=containersData.delete();
+ result&=defaultContainerManager.delete();
initialized=false;
return result;
}
@@ -101,22 +104,31 @@
}
public MapContainer getMapContainer(Object id) throws IOException{
+ return getMapContainer(id, DEFAULT_CONTAINER_NAME);
+ }
+
+ public synchronized MapContainer getMapContainer(Object id, String
dataContainerName) throws IOException{
initialize();
+
MapContainer result=(MapContainer) maps.get(id);
if(result==null){
- IndexItem root=mapsContainer.addRoot(id);
- result=new MapContainerImpl(id,root,indexManager,containersData);
- maps.put(id,result);
+ DataManager dm = getDataManager(dataContainerName);
+ ContainerId containerId = new ContainerId();
+ containerId.setKey(id);
+ containerId.setDataContainerPrefix(dataContainerName);
+ IndexItem root=mapsContainer.addRoot(containerId);
+ result=new MapContainerImpl(containerId,root,indexManager,dm);
+ maps.put(containerId.getKey(),result);
}
return result;
}
public void deleteMapContainer(Object id) throws IOException{
initialize();
- MapContainer container=(MapContainer) maps.remove(id);
+ MapContainerImpl container=(MapContainerImpl) maps.remove(id);
if(container!=null){
container.clear();
- mapsContainer.removeRoot(id);
+ mapsContainer.removeRoot(container.getContainerId());
}
}
@@ -131,22 +143,31 @@
}
public ListContainer getListContainer(Object id) throws IOException{
+ return getListContainer(id,DEFAULT_CONTAINER_NAME);
+ }
+
+ public synchronized ListContainer getListContainer(Object id, String
dataContainerName) throws IOException{
initialize();
+
ListContainer result=(ListContainer) lists.get(id);
if(result==null){
- IndexItem root=listsContainer.addRoot(id);
- result=new ListContainerImpl(id,root,indexManager,containersData);
- lists.put(id,result);
+ DataManager dm = getDataManager(dataContainerName);
+ ContainerId containerId = new ContainerId();
+ containerId.setKey(id);
+ containerId.setDataContainerPrefix(dataContainerName);
+ IndexItem root=listsContainer.addRoot(containerId);
+ result=new ListContainerImpl(containerId,root,indexManager,dm);
+ lists.put(containerId.getKey(),result);
}
return result;
}
public void deleteListContainer(Object id) throws IOException{
initialize();
- ListContainer container=(ListContainer) lists.remove(id);
+ ListContainerImpl container=(ListContainerImpl) lists.remove(id);
if(container!=null){
container.clear();
- listsContainer.removeRoot(id);
+ listsContainer.removeRoot(container.getContainerId());
}
}
@@ -164,12 +185,13 @@
protected synchronized void initialize() throws IOException{
if(!initialized){
initialized=true;
- File dir=new File(name);
- dir.mkdirs();
- File ifile=new File(dir,"kaha.idx");
+ directory=new File(name);
+ directory.mkdirs();
+ File ifile=new File(directory,"kaha.idx");
indexManager=new IndexManager(ifile,mode);
- rootData=new DataManager(dir,"roots-data");
- containersData=new DataManager(dir,"containers-data");
+ rootData=new DataManager(directory,"roots-data");
+ defaultContainerManager=new
DataManager(directory,DEFAULT_CONTAINER_NAME);
+ dataManagers.put(DEFAULT_CONTAINER_NAME, defaultContainerManager);
IndexItem mapRoot=new IndexItem();
IndexItem listRoot=new IndexItem();
if(indexManager.isEmpty()){
@@ -186,20 +208,34 @@
listsContainer=new
IndexRootContainer(listRoot,indexManager,rootData);
rootData.consolidateDataFiles();
for(Iterator i=mapsContainer.getKeys().iterator();i.hasNext();){
- Object key=i.next();
+ ContainerId key=(ContainerId) i.next();
+ DataManager dm = getDataManager(key.getDataContainerPrefix());
IndexItem root=mapsContainer.getRoot(key);
- BaseContainerImpl container=new
MapContainerImpl(key,root,indexManager,containersData);
+ BaseContainerImpl container=new
MapContainerImpl(key,root,indexManager,dm);
container.expressDataInterest();
- maps.put(key,container);
+ maps.put(key.getKey(),container);
}
for(Iterator i=listsContainer.getKeys().iterator();i.hasNext();){
- Object key=i.next();
+ ContainerId key=(ContainerId) i.next();
+ DataManager dm = getDataManager(key.getDataContainerPrefix());
IndexItem root=listsContainer.getRoot(key);
- BaseContainerImpl container=new
ListContainerImpl(key,root,indexManager,containersData);
+ BaseContainerImpl container=new
ListContainerImpl(key,root,indexManager,dm);
container.expressDataInterest();
- lists.put(key,container);
+ lists.put(key.getKey(),container);
+ }
+ for (Iterator i = dataManagers.values().iterator(); i.hasNext();){
+ DataManager dm = (DataManager) i.next();
+ dm.consolidateDataFiles();
}
- containersData.consolidateDataFiles();
}
+ }
+
+ protected DataManager getDataManager(String prefix){
+ DataManager dm = (DataManager) dataManagers.get(prefix);
+ if (dm == null){
+ dm = new DataManager(directory,prefix);
+ dataManagers.put(prefix,dm);
+ }
+ return dm;
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java?rev=395689&r1=395688&r2=395689&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java
Thu Apr 20 13:07:32 2006
@@ -34,7 +34,7 @@
private static final Log log=LogFactory.getLog(ListContainerImpl.class);
protected Marshaller marshaller=new ObjectMarshaller();
- protected ListContainerImpl(Object id,IndexItem root,IndexManager
indexManager,DataManager dataManager)
+ protected ListContainerImpl(ContainerId id,IndexItem root,IndexManager
indexManager,DataManager dataManager)
throws IOException{
super(id,root,indexManager,dataManager);
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java?rev=395689&r1=395688&r2=395689&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java
Thu Apr 20 13:07:32 2006
@@ -39,7 +39,7 @@
protected Marshaller keyMarshaller=new ObjectMarshaller();
protected Marshaller valueMarshaller=new ObjectMarshaller();
- protected MapContainerImpl(Object id,IndexItem root,IndexManager
indexManager,DataManager dataManager){
+ protected MapContainerImpl(ContainerId id,IndexItem root,IndexManager
indexManager,DataManager dataManager){
super(id,root,indexManager,dataManager);
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java?rev=395689&r1=395688&r2=395689&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java
Thu Apr 20 13:07:32 2006
@@ -78,7 +78,7 @@
public synchronized MessageStore createQueueMessageStore(ActiveMQQueue
destination) throws IOException{
MessageStore rc=(MessageStore) queues.get(destination);
if(rc==null){
- rc=new KahaMessageStore(getMapContainer(destination),destination);
+ rc=new
KahaMessageStore(getMapContainer(destination,"queue-data"),destination);
messageStores.put(destination, rc);
if(transactionStore!=null){
rc=transactionStore.proxy(rc);
@@ -91,9 +91,9 @@
public synchronized TopicMessageStore
createTopicMessageStore(ActiveMQTopic destination) throws IOException{
TopicMessageStore rc=(TopicMessageStore) topics.get(destination);
if(rc==null){
- MapContainer messageContainer=getMapContainer(destination);
- MapContainer
subsContainer=getMapContainer(destination.toString()+"-Subscriptions");
- MapContainer
ackContainer=store.getMapContainer(destination.toString()+"-Acks");
+ MapContainer
messageContainer=getMapContainer(destination,"topic-data");
+ MapContainer
subsContainer=getMapContainer(destination.toString()+"-Subscriptions","topic-subs");
+ MapContainer
ackContainer=store.getMapContainer(destination.toString(),"topic-acks");
ackContainer.setKeyMarshaller(new StringMarshaller());
ackContainer.setValueMarshaller(new AtomicIntegerMarshaller());
rc=new
KahaTopicMessageStore(store,messageContainer,ackContainer,subsContainer,destination);
@@ -114,7 +114,7 @@
public TransactionStore createTransactionStore() throws IOException{
if(transactionStore==null){
- MapContainer
container=store.getMapContainer(PREPARED_TRANSACTIONS_NAME);
+ MapContainer
container=store.getMapContainer(PREPARED_TRANSACTIONS_NAME,"transactions");
container.setKeyMarshaller(new CommandMarshaller(wireFormat));
container.setValueMarshaller(new
TransactionMarshaller(wireFormat));
container.load();
@@ -155,8 +155,8 @@
this.useExternalMessageReferences=useExternalMessageReferences;
}
- protected MapContainer getMapContainer(Object id) throws IOException{
- MapContainer container=store.getMapContainer(id);
+ protected MapContainer getMapContainer(Object id,String containerName)
throws IOException{
+ MapContainer container=store.getMapContainer(id,containerName);
container.setKeyMarshaller(new StringMarshaller());
if(useExternalMessageReferences){
container.setValueMarshaller(new StringMarshaller());
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?rev=395689&r1=395688&r2=395689&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
Thu Apr 20 13:07:32 2006
@@ -59,7 +59,7 @@
ackContainer.put(id,new AtomicInteger(subscriberCount));
for(Iterator i=subscriberAcks.keySet().iterator();i.hasNext();){
Object key=i.next();
- ListContainer container=store.getListContainer(key);
+ ListContainer
container=store.getListContainer(key,"durable-subs");
container.add(id);
}
super.addMessage(context,message);
@@ -163,7 +163,7 @@
}
protected void addSubscriberAckContainer(Object key) throws IOException{
- ListContainer container=store.getListContainer(key);
+ ListContainer container=store.getListContainer(key,"topic-subs");
Marshaller marshaller=new StringMarshaller();
container.setMarshaller(marshaller);
subscriberAcks.put(key,container);