Author: gtully
Date: Wed Aug 31 13:07:57 2011
New Revision: 1163613

URL: http://svn.apache.org/viewvc?rev=1163613&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-2868 - rework to remove sync on 
transaction completion, cursor updates are now stacked so that they ocurr in 
order, independent of thread execution after waiting for the journal to 
complete a write. This ensures that the cursors are updates in the same order 
as the index while still working wo the index lock. TransactedConsumerTest 
shows horizontal scaling now works better with transactions. Reworked 
metadata.lastUpdate to always work with the existing index lock rather than 
reaquire, this may help with spurious gc journal data file issue on windows - 
https://issues.apache.org/jira/browse/AMQ-3470

Modified:
    
activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java
    
activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml
    activemq/trunk/activemq-core/pom.xml
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java

Modified: 
activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java?rev=1163613&r1=1163612&r2=1163613&view=diff
==============================================================================
--- 
activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java
 (original)
+++ 
activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java
 Wed Aug 31 13:07:57 2011
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.camel;
 
-import java.io.File;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.jms.Connection;
 import javax.jms.MessageProducer;
@@ -28,7 +27,7 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import org.apache.activemq.util.Wait;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -88,9 +87,12 @@ public class TransactedConsumeTest exten
 
         brokerService.setAdvisorySupport(false);
         brokerService.setDataDirectory("target/data");
-        AMQPersistenceAdapter amq = new AMQPersistenceAdapter();
-        amq.setDirectory(new File("target/data"));
-        brokerService.setPersistenceAdapter(amq);
+        //AMQPersistenceAdapter amq = new AMQPersistenceAdapter();
+        //amq.setDirectory(new File("target/data"));
+        //brokerService.setPersistenceAdapter(amq);
+        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = 
(KahaDBPersistenceAdapter)
+                brokerService.getPersistenceAdapter();
+        kahaDBPersistenceAdapter.setEnableJournalDiskSyncs(false);
         brokerService.addConnector("tcp://localhost:61616");
         return brokerService;
     }

Modified: 
activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml?rev=1163613&r1=1163612&r2=1163613&view=diff
==============================================================================
--- 
activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml
 (original)
+++ 
activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml
 Wed Aug 31 13:07:57 2011
@@ -28,22 +28,17 @@
     <context:annotation-config/>
 
     <bean id="vhfBatchListenerJMSConnectionFactory" 
class="org.apache.activemq.ActiveMQConnectionFactory">
-        <property name="brokerURL" 
value="tcp://localhost:61616?jms.prefetchPolicy.all=1000"/>
+        <property name="brokerURL" 
value="tcp://localhost:61616?jms.prefetchPolicy.all=1"/>
     </bean>
 
     <bean id="vhfBatchListenerPooledConnectionFactory" 
class="org.apache.activemq.pool.PooledConnectionFactory">
-        <!-- match maxConnections to the number of routes that share the  
connection factory -->
-        <property name="maxConnections" value="2"/>
-        <!-- match maximumActive (which is active sessions) to num routes *  
concurrentConsumers in the MLC -->
-        <property name="maximumActive" value="20"/>
+        <!-- match maxConnections to the number of routes that share the 
connection factory -->
+        <property name="maxConnections" value="10"/>
+        <!-- match maximumActive (which is active sessions) >=  
concurrentConsumers in the MLC -->
+        <property name="maximumActive" value="1"/>
         <property name="connectionFactory" 
ref="vhfBatchListenerJMSConnectionFactory"/>
     </bean>
 
-    <!-- bean id="vhfBatchListenerSingleConnectionFactory" 
class="org.springframework.jms.connection.SingleConnectionFactory">
-       <property name="reconnectOnException" value="true" />
-       <property name="targetConnectionFactory" 
ref="vhfBatchListenerJMSConnectionFactory" />
-   </bean -->
-
     <!-- JMS Transaction manager -->
     <bean id="vhfBatchListenerJMSTransactionManager" 
class="org.springframework.jms.connection.JmsTransactionManager">
         <property name="connectionFactory" 
ref="vhfBatchListenerPooledConnectionFactory"/>
@@ -53,9 +48,8 @@
     <bean id="vhfBatchListenerJMSConfig" 
class="org.apache.camel.component.jms.JmsConfiguration">
         <property name="connectionFactory" 
ref="vhfBatchListenerPooledConnectionFactory"/>
         <property name="transactionManager" 
ref="vhfBatchListenerJMSTransactionManager"/>
-        <property name="receiveTimeout" value="20000" />
         <property name="transacted" value="true"/>
-        <property name="concurrentConsumers" value="10"/>
+        <property name="concurrentConsumers" value="1"/>
         <property name="cacheLevelName" value="CACHE_CONSUMER"/>
     </bean>
 
@@ -72,6 +66,30 @@
     <bean id="activemq2" 
class="org.apache.activemq.camel.component.ActiveMQComponent">
         <property name="configuration" ref="vhfBatchListenerJMSConfig"/>
     </bean>
+    <bean id="activemq3" 
class="org.apache.activemq.camel.component.ActiveMQComponent">
+        <property name="configuration" ref="vhfBatchListenerJMSConfig"/>
+    </bean>
+    <bean id="activemq4" 
class="org.apache.activemq.camel.component.ActiveMQComponent">
+        <property name="configuration" ref="vhfBatchListenerJMSConfig"/>
+    </bean>
+    <bean id="activemq5" 
class="org.apache.activemq.camel.component.ActiveMQComponent">
+        <property name="configuration" ref="vhfBatchListenerJMSConfig"/>
+    </bean>
+    <bean id="activemq6" 
class="org.apache.activemq.camel.component.ActiveMQComponent">
+        <property name="configuration" ref="vhfBatchListenerJMSConfig"/>
+    </bean>
+    <bean id="activemq7" 
class="org.apache.activemq.camel.component.ActiveMQComponent">
+        <property name="configuration" ref="vhfBatchListenerJMSConfig"/>
+    </bean>
+    <bean id="activemq8" 
class="org.apache.activemq.camel.component.ActiveMQComponent">
+        <property name="configuration" ref="vhfBatchListenerJMSConfig"/>
+    </bean>
+    <bean id="activemq9" 
class="org.apache.activemq.camel.component.ActiveMQComponent">
+        <property name="configuration" ref="vhfBatchListenerJMSConfig"/>
+    </bean>
+    <bean id="activemq10" 
class="org.apache.activemq.camel.component.ActiveMQComponent">
+        <property name="configuration" ref="vhfBatchListenerJMSConfig"/>
+    </bean>
 
     <camelContext xmlns="http://camel.apache.org/schema/spring";>
         <route>
@@ -79,11 +97,44 @@
             <process ref="connectionLog"/>
         </route>
 
-        <!-- better through put with a second route/connection once shared 
pool config matches concurrentConsumers -->
+        <!-- better through put with a additional route/connection once shared 
pool config matches concurrentConsumers -->
         <route>
             <from uri="activemq2:queue:scp_transacted"/>
             <process ref="connectionLog"/>
         </route>
+        <route>
+            <from uri="activemq3:queue:scp_transacted"/>
+            <process ref="connectionLog"/>
+        </route>
+        <route>
+            <from uri="activemq4:queue:scp_transacted"/>
+            <process ref="connectionLog"/>
+        </route>
+        <route>
+            <from uri="activemq5:queue:scp_transacted"/>
+            <process ref="connectionLog"/>
+        </route>
+        <route>
+            <from uri="activemq6:queue:scp_transacted"/>
+            <process ref="connectionLog"/>
+        </route>
+        <route>
+            <from uri="activemq7:queue:scp_transacted"/>
+            <process ref="connectionLog"/>
+        </route>
+        <route>
+            <from uri="activemq8:queue:scp_transacted"/>
+            <process ref="connectionLog"/>
+        </route>
+        <route>
+            <from uri="activemq9:queue:scp_transacted"/>
+            <process ref="connectionLog"/>
+        </route>
+        <route>
+            <from uri="activemq10:queue:scp_transacted"/>
+            <process ref="connectionLog"/>
+        </route>
+
     </camelContext>
 
     <bean id="connectionLog" 
class="org.apache.activemq.camel.TransactedConsumeTest.ConnectionLog"/>

Modified: activemq/trunk/activemq-core/pom.xml
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=1163613&r1=1163612&r2=1163613&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Wed Aug 31 13:07:57 2011
@@ -463,7 +463,6 @@
             <exclude>**/QuickJournalRecoveryBrokerTest.*</exclude>
             <exclude>**/QuickJournalXARecoveryBrokerTest.*</exclude>
             <exclude>**/RendezvousDiscoverTransportTest.*</exclude>
-            <exclude>**/MissingDataFileTest.*</exclude>
 
             <!-- m2 tests failing since move from assembly  -->
             <exclude>**/QueueConsumerCloseAndReconnectTest.*</exclude>

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java?rev=1163613&r1=1163612&r2=1163613&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
 Wed Aug 31 13:07:57 2011
@@ -286,13 +286,10 @@ public class KahaDBTransactionStore impl
 
             } else {
                 KahaTransactionInfo info = getTransactionInfo(txid);
-                // ensure message order w.r.t to cursor and store for 
setBatch()
-                synchronized (this) {
                     for (Journal journal : 
theStore.getJournalManager().getJournals()) {
                         theStore.store(journal, new 
KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
                     }
                     forgetRecoveredAcks(txid);
-                }
             }
         } else {
             LOG.error("Null transaction passed on commit");

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1163613&r1=1163612&r2=1163613&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 Wed Aug 31 13:07:57 2011
@@ -32,7 +32,6 @@ import org.apache.activemq.command.Activ
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.LocalTransactionId;
 import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
@@ -678,7 +677,7 @@ public class MessageDatabase extends Ser
                 lastRecoveryPosition = nextRecoveryPosition;
                 metadata.lastUpdate = lastRecoveryPosition;
                 JournalCommand<?> message = load(journal, 
lastRecoveryPosition);
-                process(message, lastRecoveryPosition);
+                process(message, lastRecoveryPosition, (Runnable)null);
                 nextRecoveryPosition = 
journal.getNextLocation(lastRecoveryPosition);
             }
         } finally {
@@ -779,24 +778,30 @@ public class MessageDatabase extends Ser
             long start = System.currentTimeMillis();
             Location location = journal.write(os.toByteSequence(), sync);
             long start2 = System.currentTimeMillis();
-            process(data, location);
+            process(data, location, after);
             long end = System.currentTimeMillis();
             if (LOG_SLOW_ACCESS_TIME > 0 && end - start > 
LOG_SLOW_ACCESS_TIME) {
                 LOG.info("Slow KahaDB access: Journal append took: " + (start2 
- start) + " ms, Index update took " + (end - start2) + " ms");
             }
 
-            this.indexLock.writeLock().lock();
-            try {
-                metadata.lastUpdate = location;
-            } finally {
-                this.indexLock.writeLock().unlock();
+            if (after != null) {
+                Runnable afterCompletion = null;
+                synchronized (orderedTransactionAfters) {
+                    if (!orderedTransactionAfters.empty()) {
+                        afterCompletion = orderedTransactionAfters.pop();
+                    }
+                }
+                if (afterCompletion != null) {
+                    afterCompletion.run();
+                } else {
+                    // non persistent message case
+                    after.run();
+                }
             }
+
             if (!checkpointThread.isAlive()) {
                 startCheckpoint();
             }
-            if (after != null) {
-                after.run();
-            }
             return location;
         } catch (IOException ioe) {
             LOG.error("KahaDB failed to store to Journal", ioe);
@@ -831,7 +836,7 @@ public class MessageDatabase extends Ser
      */
     void process(JournalCommand<?> data, final Location location, final 
Location inDoubtlocation) throws IOException {
         if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 
0) {
-            process(data, location);
+            process(data, location, (Runnable) null);
         } else {
             // just recover producer audit
             data.visit(new Visitor() {
@@ -848,7 +853,7 @@ public class MessageDatabase extends Ser
     // from the recovery method too so they need to be idempotent
     // /////////////////////////////////////////////////////////////////
 
-    void process(JournalCommand<?> data, final Location location) throws 
IOException {
+    void process(JournalCommand<?> data, final Location location, final 
Runnable after) throws IOException {
         data.visit(new Visitor() {
             @Override
             public void visit(KahaAddMessageCommand command) throws 
IOException {
@@ -867,7 +872,7 @@ public class MessageDatabase extends Ser
 
             @Override
             public void visit(KahaCommitCommand command) throws IOException {
-                process(command, location);
+                process(command, location, after);
             }
 
             @Override
@@ -884,6 +889,16 @@ public class MessageDatabase extends Ser
             public void visit(KahaSubscriptionCommand command) throws 
IOException {
                 process(command, location);
             }
+
+            @Override
+            public void visit(KahaProducerAuditCommand command) throws 
IOException {
+                processLocation(location);
+            }
+
+            @Override
+            public void visit(KahaTraceCommand command) {
+                processLocation(location);
+            }
         });
     }
 
@@ -950,7 +965,25 @@ public class MessageDatabase extends Ser
         }
     }
 
-    protected void process(KahaCommitCommand command, Location location) 
throws IOException {
+    protected void processLocation(final Location location) {
+        this.indexLock.writeLock().lock();
+        try {
+            metadata.lastUpdate = location;
+        } finally {
+            this.indexLock.writeLock().unlock();
+        }
+    }
+
+    private final Stack<Runnable> orderedTransactionAfters = new 
Stack<Runnable>();
+    private void push(Runnable after) {
+        if (after != null) {
+            synchronized (orderedTransactionAfters) {
+                orderedTransactionAfters.push(after);
+            }
+        }
+    }
+
+    protected void process(KahaCommitCommand command, Location location, final 
Runnable after) throws IOException {
         TransactionId key = key(command.getTransactionInfo());
         List<Operation> inflightTx;
         synchronized (inflightTransactions) {
@@ -973,6 +1006,8 @@ public class MessageDatabase extends Ser
                     }
                 }
             });
+            metadata.lastUpdate = location;
+            push(after);
         } finally {
             this.indexLock.writeLock().unlock();
         }
@@ -1046,6 +1081,7 @@ public class MessageDatabase extends Ser
         }
         // record this id in any event, initial send or recovery
         metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
+        metadata.lastUpdate = location;
     }
 
     void updateIndex(Transaction tx, KahaRemoveMessageCommand command, 
Location ackLocation) throws IOException {
@@ -1079,6 +1115,7 @@ public class MessageDatabase extends Ser
             }
 
         }
+        metadata.lastUpdate = ackLocation;
     }
 
     Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, 
Set<Integer>>();
@@ -2224,6 +2261,7 @@ public class MessageDatabase extends Ser
                         cursor.lowPriorityCursorPosition = 
nextPosition.longValue();
                     }
                 } else {
+                    LOG.warn("setBatch: sequence " + sequence + " not found in 
orderindex:" + this);
                     lastDefaultKey = sequence;
                     cursor.defaultCursorPosition = nextPosition.longValue();
                 }

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java?rev=1163613&r1=1163612&r2=1163613&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java
 Wed Aug 31 13:07:57 2011
@@ -28,6 +28,7 @@ import junit.framework.TestCase;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import org.apache.activemq.usage.SystemUsage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,7 +45,7 @@ public class MissingDataFileTest extends
 
     private static final Logger LOG = 
LoggerFactory.getLogger(MissingDataFileTest.class);
     
-    private static int counter = 300;
+    private static int counter = 500;
 
     private static int hectorToHaloCtr;
     private static int xenaToHaloCtr;
@@ -94,12 +95,13 @@ public class MissingDataFileTest extends
    
         SystemUsage systemUsage;
         systemUsage = new SystemUsage();
-        systemUsage.getMemoryUsage().setLimit(1024 * 1024); // Just a few 
messags 
+        systemUsage.getMemoryUsage().setLimit(10 * 1024 * 1024); // Just a few 
messags
         broker.setSystemUsage(systemUsage);
         
-        AMQPersistenceAdapterFactory factory = (AMQPersistenceAdapterFactory) 
broker.getPersistenceFactory();
-        factory.setMaxFileLength(2*1024); // ~4 messages
-        factory.setCleanupInterval(1000); // every few second
+        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new 
KahaDBPersistenceAdapter();
+        kahaDBPersistenceAdapter.setJournalMaxFileLength(16*1024);
+        kahaDBPersistenceAdapter.setCleanupInterval(500);
+        broker.setPersistenceAdapter(kahaDBPersistenceAdapter);
         
         broker.start();
         LOG.info("Starting broker..");


Reply via email to