Author: rajdavies Date: Wed Nov 15 12:56:21 2006 New Revision: 475416 URL: http://svn.apache.org/viewvc?view=rev&rev=475416 Log: change Queue message store in Kaha store adaptor to use memory efficent list instead of Map containers
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LRUCache.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java?view=diff&rev=475416&r1=475415&r2=475416 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java Wed Nov 15 12:56:21 2006 @@ -47,6 +47,7 @@ protected int offset=0; protected int maximumCacheSize=100; protected IndexItem lastCached; + protected boolean cacheEnabled = true; public ListContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager, String indexType) throws IOException{ @@ -858,46 +859,51 @@ } protected void itemAdded(IndexItem item,int pos,Object value){ - int cachePosition=pos-offset; - // if pos is before the cache offset - // we need to clear the cache - if(pos<offset){ - clearCache(); - } - if(cacheList.isEmpty()){ - offset=pos; - cacheList.add(value); - lastCached=item; - }else if(cachePosition==cacheList.size()&&cachePosition<maximumCacheSize){ - cacheList.add(value); - lastCached=item; - }else if(cachePosition>=0&&cachePosition<=cacheList.size()){ - cacheList.add(cachePosition,value); - if(cacheList.size()>maximumCacheSize){ - itemRemoved(cacheList.size()-1); + if(cacheEnabled){ + int cachePosition=pos-offset; + // if pos is before the cache offset + // we need to clear the cache + if(pos<offset){ + clearCache(); + } + if(cacheList.isEmpty()){ + offset=pos; + cacheList.add(value); + lastCached=item; + }else if(cachePosition==cacheList.size()&&cachePosition<maximumCacheSize){ + cacheList.add(value); + lastCached=item; + }else if(cachePosition>=0&&cachePosition<=cacheList.size()){ + cacheList.add(cachePosition,value); + if(cacheList.size()>maximumCacheSize){ + itemRemoved(cacheList.size()-1); + } } } } protected void itemRemoved(int pos){ - int lastPosition=offset+cacheList.size()-1; - int cachePosition=pos-offset; - if(cachePosition>=0&&cachePosition<cacheList.size()){ - if(cachePosition==lastPosition){ - if(lastCached!=null){ - lastCached=indexList.getPrevEntry(lastCached); + if(cacheEnabled){ + int lastPosition=offset+cacheList.size()-1; + int cachePosition=pos-offset; + if(cachePosition>=0&&cachePosition<cacheList.size()){ + if(cachePosition==lastPosition){ + if(lastCached!=null){ + lastCached=indexList.getPrevEntry(lastCached); + } + } + cacheList.remove(pos); + if(cacheList.isEmpty()){ + clearCache(); } - } - cacheList.remove(pos); - if(cacheList.isEmpty()){ - clearCache(); } } } protected Object getCachedItem(int pos){ - int cachePosition=pos-offset; Object result=null; + if(cacheEnabled) { + int cachePosition=pos-offset; if(cachePosition>=0&&cachePosition<cacheList.size()){ result=cacheList.get(cachePosition); } @@ -928,6 +934,12 @@ } } } + }else { + IndexItem item=indexList.get(pos); + if(item!=null){ + result=getValue(item); + } + } return result; } @@ -980,6 +992,10 @@ */ public synchronized void setMaximumCacheSize(int maximumCacheSize){ this.maximumCacheSize=maximumCacheSize; + cacheEnabled = maximumCacheSize >= 0; + if (!cacheEnabled) { + clearCache(); + } } /** Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?view=diff&rev=475416&r1=475415&r2=475416 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Wed Nov 15 12:56:21 2006 @@ -19,16 +19,17 @@ import java.io.IOException; import java.util.Iterator; - import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; -import org.apache.activemq.kaha.MapContainer; +import org.apache.activemq.kaha.ListContainer; +import org.apache.activemq.kaha.StoreEntry; import org.apache.activemq.memory.UsageManager; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; +import org.apache.activemq.util.LRUCache; /** * An implementation of [EMAIL PROTECTED] org.apache.activemq.store.MessageStore} which uses a JPS Container * @@ -36,50 +37,84 @@ */ public class KahaMessageStore implements MessageStore{ protected final ActiveMQDestination destination; - protected final MapContainer messageContainer; + protected final ListContainer messageContainer; + protected final LRUCache cache; - public KahaMessageStore(MapContainer container,ActiveMQDestination destination) throws IOException{ + public KahaMessageStore(ListContainer container,ActiveMQDestination destination, int maximumCacheSize) throws IOException{ this.messageContainer=container; this.destination=destination; + this.cache=new LRUCache(maximumCacheSize,maximumCacheSize,0.75f,false); + // populate the cache + StoreEntry entry=messageContainer.getFirst(); + int count = 0; + if(entry!=null){ + do{ + Message msg = (Message)messageContainer.get(entry); + cache.put(msg.getMessageId(),entry); + entry = messageContainer.getNext(entry); + count++; + }while(entry!=null && count < maximumCacheSize); + } } public Object getId(){ return messageContainer.getId(); } - public void addMessage(ConnectionContext context,Message message) throws IOException{ - messageContainer.put(message.getMessageId().toString(),message); + public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{ + StoreEntry item = messageContainer.placeLast(message); + cache.put(message.getMessageId(),item); } - public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef) + public synchronized void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef) throws IOException{ - messageContainer.put(messageId.toString(),messageRef); + throw new RuntimeException("Not supported"); } - public Message getMessage(MessageId identity) throws IOException{ - return (Message) messageContainer.get(identity.toString()); + public synchronized Message getMessage(MessageId identity) throws IOException{ + Message result=null; + StoreEntry entry=(StoreEntry)cache.remove(identity); + if(entry!=null){ + result = (Message)messageContainer.get(entry); + }else{ + + for(Iterator i=messageContainer.iterator();i.hasNext();){ + Message msg=(Message)i.next(); + if(msg.getMessageId().equals(identity)){ + result=msg; + break; + } + } + } + return result; } public String getMessageReference(MessageId identity) throws IOException{ - return (String) messageContainer.get(identity.toString()); + return null; } public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{ - messageContainer.remove(ack.getLastMessageId().toString()); + removeMessage(ack.getLastMessageId()); } - public void removeMessage(MessageId msgId) throws IOException{ - messageContainer.remove(msgId.toString()); + public synchronized void removeMessage(MessageId msgId) throws IOException{ + StoreEntry entry=(StoreEntry)cache.remove(msgId); + if(entry!=null){ + messageContainer.remove(entry); + }else{ + for(Iterator i=messageContainer.iterator();i.hasNext();){ + Message msg=(Message)i.next(); + if(msg.getMessageId().equals(msgId)){ + i.remove(); + break; + } + } + } } - public void recover(MessageRecoveryListener listener) throws Exception{ - for(Iterator iter=messageContainer.values().iterator();iter.hasNext();){ - Object msg=(Object) iter.next(); - if(msg.getClass()==String.class){ - listener.recoverMessageReference((String) msg); - }else{ - listener.recoverMessage((Message) msg); - } + public synchronized void recover(MessageRecoveryListener listener) throws Exception{ + for(Iterator iter=messageContainer.iterator();iter.hasNext();){ + listener.recoverMessage((Message)iter.next()); } listener.finished(); } @@ -88,16 +123,18 @@ public void stop() {} - public void removeAllMessages(ConnectionContext context) throws IOException{ + public synchronized void removeAllMessages(ConnectionContext context) throws IOException{ messageContainer.clear(); + cache.clear(); } public ActiveMQDestination getDestination(){ return destination; } - public void delete(){ + public synchronized void delete(){ messageContainer.clear(); + cache.clear(); } /** Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?view=diff&rev=475416&r1=475415&r2=475416 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java Wed Nov 15 12:56:21 2006 @@ -1,19 +1,15 @@ /** * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.activemq.store.kahadaptor; @@ -59,6 +55,7 @@ private boolean useExternalMessageReferences; private OpenWireFormat wireFormat=new OpenWireFormat(); private long maxDataFileLength=32*1024*1024; + private int maximumDestinationCacheSize=2000; private String indexType=IndexTypes.DISK_INDEX; private File dir; private Store theStore; @@ -68,6 +65,8 @@ dir.mkdirs(); } this.dir=dir; + wireFormat.setCacheEnabled(false); + wireFormat.setTightEncodingEnabled(true); } public Set getDestinations(){ @@ -89,7 +88,7 @@ public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{ MessageStore rc=(MessageStore)queues.get(destination); if(rc==null){ - rc=new KahaMessageStore(getMapContainer(destination,"queue-data"),destination); + rc=new KahaMessageStore(getListContainer(destination,"queue-data"),destination,maximumDestinationCacheSize); messageStores.put(destination,rc); if(transactionStore!=null){ rc=transactionStore.proxy(rc); @@ -185,10 +184,11 @@ container.load(); return container; } - + protected ListContainer getListContainer(Object id,String containerName) throws IOException{ Store store=getStore(); ListContainer container=store.getListContainer(id,containerName); + container.setMaximumCacheSize(0); if(useExternalMessageReferences){ container.setMarshaller(new StringMarshaller()); }else{ @@ -199,9 +199,7 @@ } /** - * @param usageManager - * The UsageManager that is controlling the broker's memory - * usage. + * @param usageManager The UsageManager that is controlling the broker's memory usage. */ public void setUsageManager(UsageManager usageManager){ } @@ -214,8 +212,7 @@ } /** - * @param maxDataFileLength - * the maxDataFileLength to set + * @param maxDataFileLength the maxDataFileLength to set * * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" */ @@ -235,6 +232,20 @@ */ public void setIndexType(String indexType){ this.indexType=indexType; + } + + /** + * @return the maximumDestinationCacheSize + */ + public int getMaximumDestinationCacheSize(){ + return this.maximumDestinationCacheSize; + } + + /** + * @param maximumDestinationCacheSize the maximumDestinationCacheSize to set + */ + public void setMaximumDestinationCacheSize(int maximumDestinationCacheSize){ + this.maximumDestinationCacheSize=maximumDestinationCacheSize; } protected synchronized Store getStore() throws IOException{ Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LRUCache.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LRUCache.java?view=diff&rev=475416&r1=475415&r2=475416 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LRUCache.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LRUCache.java Wed Nov 15 12:56:21 2006 @@ -37,6 +37,24 @@ public LRUCache(){ super(1000,0.75f,true); } + + /** + * Constructs an empty <tt>LRUCache</tt> instance with the + * specified initial capacity, maximumCacheSize,load factor and ordering mode. + * + * @param initialCapacity the initial capacity. + * @param maximumCacheSize + * @param loadFactor the load factor. + * @param accessOrder the ordering mode - <tt>true</tt> for + * access-order, <tt>false</tt> for insertion-order. + * @throws IllegalArgumentException if the initial capacity is negative + * or the load factor is nonpositive. + */ + + public LRUCache(int initialCapacity,int maximumCacheSize,float loadFactor, boolean accessOrder) { + super(initialCapacity,loadFactor,accessOrder); + this.maxCacheSize = maximumCacheSize; + }