Author: rajdavies Date: Thu Nov 30 05:46:08 2006 New Revision: 480924 URL: http://svn.apache.org/viewvc?view=rev&rev=480924 Log: log which Persistence Adaptor we are using
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?view=diff&rev=480924&r1=480923&r2=480924 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Thu Nov 30 05:46:08 2006 @@ -385,7 +385,7 @@ startDestinations(); addShutdownHook(); - log.info("Using Persistence Adaptor " + getPersistenceAdapter()); + log.info("Using Persistence Adaptor: " + getPersistenceAdapter()); if (deleteAllMessagesOnStartup) { deleteAllMessages(); } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?view=diff&rev=480924&r1=480923&r2=480924 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java Thu Nov 30 05:46:08 2006 @@ -59,7 +59,7 @@ } nonPersistent.start(); persistent.start(); - pendingCount=persistent.size(); + pendingCount=persistent.size() + nonPersistent.size(); } public synchronized void stop() throws Exception{ @@ -87,12 +87,28 @@ } } } + + public void addMessageFirst(MessageReference node) throws Exception{ + if(node!=null){ + Message msg=node.getMessage(); + if(started){ + pendingCount++; + if(!msg.isPersistent()){ + nonPersistent.addMessageFirst(node); + } + } + if(msg.isPersistent()){ + persistent.addMessageFirst(node); + } + } + } public void clear(){ pendingCount=0; } public synchronized boolean hasNext(){ + boolean result=pendingCount>0; if(result){ try{ @@ -107,7 +123,8 @@ } public synchronized MessageReference next(){ - return currentCursor!=null?currentCursor.next():null; + MessageReference result = currentCursor!=null?currentCursor.next():null; + return result; } public synchronized void remove(){ @@ -118,6 +135,11 @@ } public void remove(MessageReference node){ + if (!node.isPersistent()) { + nonPersistent.remove(node); + }else { + persistent.remove(node); + } pendingCount--; } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java?view=diff&rev=480924&r1=480923&r2=480924 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java Thu Nov 30 05:46:08 2006 @@ -88,5 +88,9 @@ ds.setCreateDatabase("create"); return ds; } + + public String toString(){ + return ""+dataSource; + } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?view=diff&rev=480924&r1=480923&r2=480924 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Thu Nov 30 05:46:08 2006 @@ -482,4 +482,8 @@ protected DatabaseLocker createDatabaseLocker() throws IOException { return new DefaultDatabaseLocker(getDataSource(), getStatements()); } + + public String toString(){ + return "JDBCPersistenceAdaptor("+super.toString()+")"; + } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?view=diff&rev=480924&r1=480923&r2=480924 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Thu Nov 30 05:46:08 2006 @@ -669,5 +669,9 @@ org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence(); return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength()); } + + public String toString(){ + return "JournalPersistenceAdapator(" + longTermPersistence + ")"; + } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?view=diff&rev=480924&r1=480923&r2=480924 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java Thu Nov 30 05:46:08 2006 @@ -270,4 +270,8 @@ String name=dir.getAbsolutePath()+File.separator+"kahadb"; return name; } + + public String toString(){ + return "KahaPersistenceAdapter(" + getStoreName() +")"; + } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java?view=diff&rev=480924&r1=480923&r2=480924 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java Thu Nov 30 05:46:08 2006 @@ -50,12 +50,16 @@ } public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{ - messageTable.put(message.getMessageId(),message); + synchronized(messageTable){ + messageTable.put(message.getMessageId(),message); + } } public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef) throws IOException{ - messageTable.put(messageId,messageRef); + synchronized(messageTable){ + messageTable.put(messageId,messageRef); + } } public Message getMessage(MessageId identity) throws IOException{ @@ -67,11 +71,16 @@ } public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{ - messageTable.remove(ack.getLastMessageId()); + removeMessage(ack.getLastMessageId()); } public void removeMessage(MessageId msgId) throws IOException{ - messageTable.remove(msgId); + synchronized(messageTable){ + messageTable.remove(msgId); + if(lastBatchId!=null&lastBatchId.equals(msgId)){ + lastBatchId=null; + } + } } public void recover(MessageRecoveryListener listener) throws Exception{ @@ -96,7 +105,9 @@ } public void removeAllMessages(ConnectionContext context) throws IOException{ - messageTable.clear(); + synchronized(messageTable){ + messageTable.clear(); + } } public ActiveMQDestination getDestination(){ @@ -104,7 +115,9 @@ } public void delete(){ - messageTable.clear(); + synchronized(messageTable){ + messageTable.clear(); + } } /** @@ -117,18 +130,16 @@ return messageTable.size(); } - public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{ synchronized(messageTable){ - boolean pastLackBatch=lastBatchId==null; - int count = 0; + int count=0; for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){ Map.Entry entry=(Entry)iter.next(); if(pastLackBatch){ count++; Object msg=entry.getValue(); - lastBatchId = (MessageId)entry.getKey(); + lastBatchId=(MessageId)entry.getKey(); if(msg.getClass()==String.class){ listener.recoverMessageReference((String)msg); }else{ @@ -143,6 +154,6 @@ } public void resetBatching(){ - lastBatchId = null; + lastBatchId=null; } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java?view=diff&rev=480924&r1=480923&r2=480924 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java Thu Nov 30 05:46:08 2006 @@ -154,4 +154,8 @@ */ public void setUsageManager(UsageManager usageManager) { } + + public String toString(){ + return "MemoryPersistenceAdapter"; + } }