Author: gtully
Date: Wed Aug 31 13:07:57 2011
New Revision: 1163613
URL: http://svn.apache.org/viewvc?rev=1163613&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-2868 - rework to remove sync on
transaction completion, cursor updates are now stacked so that they ocurr in
order, independent of thread execution after waiting for the journal to
complete a write. This ensures that the cursors are updates in the same order
as the index while still working wo the index lock. TransactedConsumerTest
shows horizontal scaling now works better with transactions. Reworked
metadata.lastUpdate to always work with the existing index lock rather than
reaquire, this may help with spurious gc journal data file issue on windows -
https://issues.apache.org/jira/browse/AMQ-3470
Modified:
activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java
activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml
activemq/trunk/activemq-core/pom.xml
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java
Modified:
activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java?rev=1163613&r1=1163612&r2=1163613&view=diff
==============================================================================
---
activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java
(original)
+++
activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java
Wed Aug 31 13:07:57 2011
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.camel;
-import java.io.File;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.MessageProducer;
@@ -28,7 +27,7 @@ import org.apache.activemq.broker.region
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -88,9 +87,12 @@ public class TransactedConsumeTest exten
brokerService.setAdvisorySupport(false);
brokerService.setDataDirectory("target/data");
- AMQPersistenceAdapter amq = new AMQPersistenceAdapter();
- amq.setDirectory(new File("target/data"));
- brokerService.setPersistenceAdapter(amq);
+ //AMQPersistenceAdapter amq = new AMQPersistenceAdapter();
+ //amq.setDirectory(new File("target/data"));
+ //brokerService.setPersistenceAdapter(amq);
+ KahaDBPersistenceAdapter kahaDBPersistenceAdapter =
(KahaDBPersistenceAdapter)
+ brokerService.getPersistenceAdapter();
+ kahaDBPersistenceAdapter.setEnableJournalDiskSyncs(false);
brokerService.addConnector("tcp://localhost:61616");
return brokerService;
}
Modified:
activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml?rev=1163613&r1=1163612&r2=1163613&view=diff
==============================================================================
---
activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml
(original)
+++
activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml
Wed Aug 31 13:07:57 2011
@@ -28,22 +28,17 @@
<context:annotation-config/>
<bean id="vhfBatchListenerJMSConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
- <property name="brokerURL"
value="tcp://localhost:61616?jms.prefetchPolicy.all=1000"/>
+ <property name="brokerURL"
value="tcp://localhost:61616?jms.prefetchPolicy.all=1"/>
</bean>
<bean id="vhfBatchListenerPooledConnectionFactory"
class="org.apache.activemq.pool.PooledConnectionFactory">
- <!-- match maxConnections to the number of routes that share the
connection factory -->
- <property name="maxConnections" value="2"/>
- <!-- match maximumActive (which is active sessions) to num routes *
concurrentConsumers in the MLC -->
- <property name="maximumActive" value="20"/>
+ <!-- match maxConnections to the number of routes that share the
connection factory -->
+ <property name="maxConnections" value="10"/>
+ <!-- match maximumActive (which is active sessions) >=
concurrentConsumers in the MLC -->
+ <property name="maximumActive" value="1"/>
<property name="connectionFactory"
ref="vhfBatchListenerJMSConnectionFactory"/>
</bean>
- <!-- bean id="vhfBatchListenerSingleConnectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
- <property name="reconnectOnException" value="true" />
- <property name="targetConnectionFactory"
ref="vhfBatchListenerJMSConnectionFactory" />
- </bean -->
-
<!-- JMS Transaction manager -->
<bean id="vhfBatchListenerJMSTransactionManager"
class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory"
ref="vhfBatchListenerPooledConnectionFactory"/>
@@ -53,9 +48,8 @@
<bean id="vhfBatchListenerJMSConfig"
class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory"
ref="vhfBatchListenerPooledConnectionFactory"/>
<property name="transactionManager"
ref="vhfBatchListenerJMSTransactionManager"/>
- <property name="receiveTimeout" value="20000" />
<property name="transacted" value="true"/>
- <property name="concurrentConsumers" value="10"/>
+ <property name="concurrentConsumers" value="1"/>
<property name="cacheLevelName" value="CACHE_CONSUMER"/>
</bean>
@@ -72,6 +66,30 @@
<bean id="activemq2"
class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="configuration" ref="vhfBatchListenerJMSConfig"/>
</bean>
+ <bean id="activemq3"
class="org.apache.activemq.camel.component.ActiveMQComponent">
+ <property name="configuration" ref="vhfBatchListenerJMSConfig"/>
+ </bean>
+ <bean id="activemq4"
class="org.apache.activemq.camel.component.ActiveMQComponent">
+ <property name="configuration" ref="vhfBatchListenerJMSConfig"/>
+ </bean>
+ <bean id="activemq5"
class="org.apache.activemq.camel.component.ActiveMQComponent">
+ <property name="configuration" ref="vhfBatchListenerJMSConfig"/>
+ </bean>
+ <bean id="activemq6"
class="org.apache.activemq.camel.component.ActiveMQComponent">
+ <property name="configuration" ref="vhfBatchListenerJMSConfig"/>
+ </bean>
+ <bean id="activemq7"
class="org.apache.activemq.camel.component.ActiveMQComponent">
+ <property name="configuration" ref="vhfBatchListenerJMSConfig"/>
+ </bean>
+ <bean id="activemq8"
class="org.apache.activemq.camel.component.ActiveMQComponent">
+ <property name="configuration" ref="vhfBatchListenerJMSConfig"/>
+ </bean>
+ <bean id="activemq9"
class="org.apache.activemq.camel.component.ActiveMQComponent">
+ <property name="configuration" ref="vhfBatchListenerJMSConfig"/>
+ </bean>
+ <bean id="activemq10"
class="org.apache.activemq.camel.component.ActiveMQComponent">
+ <property name="configuration" ref="vhfBatchListenerJMSConfig"/>
+ </bean>
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
@@ -79,11 +97,44 @@
<process ref="connectionLog"/>
</route>
- <!-- better through put with a second route/connection once shared
pool config matches concurrentConsumers -->
+ <!-- better through put with a additional route/connection once shared
pool config matches concurrentConsumers -->
<route>
<from uri="activemq2:queue:scp_transacted"/>
<process ref="connectionLog"/>
</route>
+ <route>
+ <from uri="activemq3:queue:scp_transacted"/>
+ <process ref="connectionLog"/>
+ </route>
+ <route>
+ <from uri="activemq4:queue:scp_transacted"/>
+ <process ref="connectionLog"/>
+ </route>
+ <route>
+ <from uri="activemq5:queue:scp_transacted"/>
+ <process ref="connectionLog"/>
+ </route>
+ <route>
+ <from uri="activemq6:queue:scp_transacted"/>
+ <process ref="connectionLog"/>
+ </route>
+ <route>
+ <from uri="activemq7:queue:scp_transacted"/>
+ <process ref="connectionLog"/>
+ </route>
+ <route>
+ <from uri="activemq8:queue:scp_transacted"/>
+ <process ref="connectionLog"/>
+ </route>
+ <route>
+ <from uri="activemq9:queue:scp_transacted"/>
+ <process ref="connectionLog"/>
+ </route>
+ <route>
+ <from uri="activemq10:queue:scp_transacted"/>
+ <process ref="connectionLog"/>
+ </route>
+
</camelContext>
<bean id="connectionLog"
class="org.apache.activemq.camel.TransactedConsumeTest.ConnectionLog"/>
Modified: activemq/trunk/activemq-core/pom.xml
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=1163613&r1=1163612&r2=1163613&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Wed Aug 31 13:07:57 2011
@@ -463,7 +463,6 @@
<exclude>**/QuickJournalRecoveryBrokerTest.*</exclude>
<exclude>**/QuickJournalXARecoveryBrokerTest.*</exclude>
<exclude>**/RendezvousDiscoverTransportTest.*</exclude>
- <exclude>**/MissingDataFileTest.*</exclude>
<!-- m2 tests failing since move from assembly -->
<exclude>**/QueueConsumerCloseAndReconnectTest.*</exclude>
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java?rev=1163613&r1=1163612&r2=1163613&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
Wed Aug 31 13:07:57 2011
@@ -286,13 +286,10 @@ public class KahaDBTransactionStore impl
} else {
KahaTransactionInfo info = getTransactionInfo(txid);
- // ensure message order w.r.t to cursor and store for
setBatch()
- synchronized (this) {
for (Journal journal :
theStore.getJournalManager().getJournals()) {
theStore.store(journal, new
KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
}
forgetRecoveredAcks(txid);
- }
}
} else {
LOG.error("Null transaction passed on commit");
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1163613&r1=1163612&r2=1163613&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Wed Aug 31 13:07:57 2011
@@ -32,7 +32,6 @@ import org.apache.activemq.command.Activ
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
@@ -678,7 +677,7 @@ public class MessageDatabase extends Ser
lastRecoveryPosition = nextRecoveryPosition;
metadata.lastUpdate = lastRecoveryPosition;
JournalCommand<?> message = load(journal,
lastRecoveryPosition);
- process(message, lastRecoveryPosition);
+ process(message, lastRecoveryPosition, (Runnable)null);
nextRecoveryPosition =
journal.getNextLocation(lastRecoveryPosition);
}
} finally {
@@ -779,24 +778,30 @@ public class MessageDatabase extends Ser
long start = System.currentTimeMillis();
Location location = journal.write(os.toByteSequence(), sync);
long start2 = System.currentTimeMillis();
- process(data, location);
+ process(data, location, after);
long end = System.currentTimeMillis();
if (LOG_SLOW_ACCESS_TIME > 0 && end - start >
LOG_SLOW_ACCESS_TIME) {
LOG.info("Slow KahaDB access: Journal append took: " + (start2
- start) + " ms, Index update took " + (end - start2) + " ms");
}
- this.indexLock.writeLock().lock();
- try {
- metadata.lastUpdate = location;
- } finally {
- this.indexLock.writeLock().unlock();
+ if (after != null) {
+ Runnable afterCompletion = null;
+ synchronized (orderedTransactionAfters) {
+ if (!orderedTransactionAfters.empty()) {
+ afterCompletion = orderedTransactionAfters.pop();
+ }
+ }
+ if (afterCompletion != null) {
+ afterCompletion.run();
+ } else {
+ // non persistent message case
+ after.run();
+ }
}
+
if (!checkpointThread.isAlive()) {
startCheckpoint();
}
- if (after != null) {
- after.run();
- }
return location;
} catch (IOException ioe) {
LOG.error("KahaDB failed to store to Journal", ioe);
@@ -831,7 +836,7 @@ public class MessageDatabase extends Ser
*/
void process(JournalCommand<?> data, final Location location, final
Location inDoubtlocation) throws IOException {
if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >=
0) {
- process(data, location);
+ process(data, location, (Runnable) null);
} else {
// just recover producer audit
data.visit(new Visitor() {
@@ -848,7 +853,7 @@ public class MessageDatabase extends Ser
// from the recovery method too so they need to be idempotent
// /////////////////////////////////////////////////////////////////
- void process(JournalCommand<?> data, final Location location) throws
IOException {
+ void process(JournalCommand<?> data, final Location location, final
Runnable after) throws IOException {
data.visit(new Visitor() {
@Override
public void visit(KahaAddMessageCommand command) throws
IOException {
@@ -867,7 +872,7 @@ public class MessageDatabase extends Ser
@Override
public void visit(KahaCommitCommand command) throws IOException {
- process(command, location);
+ process(command, location, after);
}
@Override
@@ -884,6 +889,16 @@ public class MessageDatabase extends Ser
public void visit(KahaSubscriptionCommand command) throws
IOException {
process(command, location);
}
+
+ @Override
+ public void visit(KahaProducerAuditCommand command) throws
IOException {
+ processLocation(location);
+ }
+
+ @Override
+ public void visit(KahaTraceCommand command) {
+ processLocation(location);
+ }
});
}
@@ -950,7 +965,25 @@ public class MessageDatabase extends Ser
}
}
- protected void process(KahaCommitCommand command, Location location)
throws IOException {
+ protected void processLocation(final Location location) {
+ this.indexLock.writeLock().lock();
+ try {
+ metadata.lastUpdate = location;
+ } finally {
+ this.indexLock.writeLock().unlock();
+ }
+ }
+
+ private final Stack<Runnable> orderedTransactionAfters = new
Stack<Runnable>();
+ private void push(Runnable after) {
+ if (after != null) {
+ synchronized (orderedTransactionAfters) {
+ orderedTransactionAfters.push(after);
+ }
+ }
+ }
+
+ protected void process(KahaCommitCommand command, Location location, final
Runnable after) throws IOException {
TransactionId key = key(command.getTransactionInfo());
List<Operation> inflightTx;
synchronized (inflightTransactions) {
@@ -973,6 +1006,8 @@ public class MessageDatabase extends Ser
}
}
});
+ metadata.lastUpdate = location;
+ push(after);
} finally {
this.indexLock.writeLock().unlock();
}
@@ -1046,6 +1081,7 @@ public class MessageDatabase extends Ser
}
// record this id in any event, initial send or recovery
metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
+ metadata.lastUpdate = location;
}
void updateIndex(Transaction tx, KahaRemoveMessageCommand command,
Location ackLocation) throws IOException {
@@ -1079,6 +1115,7 @@ public class MessageDatabase extends Ser
}
}
+ metadata.lastUpdate = ackLocation;
}
Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer,
Set<Integer>>();
@@ -2224,6 +2261,7 @@ public class MessageDatabase extends Ser
cursor.lowPriorityCursorPosition =
nextPosition.longValue();
}
} else {
+ LOG.warn("setBatch: sequence " + sequence + " not found in
orderindex:" + this);
lastDefaultKey = sequence;
cursor.defaultCursorPosition = nextPosition.longValue();
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java?rev=1163613&r1=1163612&r2=1163613&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java
Wed Aug 31 13:07:57 2011
@@ -28,6 +28,7 @@ import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.usage.SystemUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,7 +45,7 @@ public class MissingDataFileTest extends
private static final Logger LOG =
LoggerFactory.getLogger(MissingDataFileTest.class);
- private static int counter = 300;
+ private static int counter = 500;
private static int hectorToHaloCtr;
private static int xenaToHaloCtr;
@@ -94,12 +95,13 @@ public class MissingDataFileTest extends
SystemUsage systemUsage;
systemUsage = new SystemUsage();
- systemUsage.getMemoryUsage().setLimit(1024 * 1024); // Just a few
messags
+ systemUsage.getMemoryUsage().setLimit(10 * 1024 * 1024); // Just a few
messags
broker.setSystemUsage(systemUsage);
- AMQPersistenceAdapterFactory factory = (AMQPersistenceAdapterFactory)
broker.getPersistenceFactory();
- factory.setMaxFileLength(2*1024); // ~4 messages
- factory.setCleanupInterval(1000); // every few second
+ KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new
KahaDBPersistenceAdapter();
+ kahaDBPersistenceAdapter.setJournalMaxFileLength(16*1024);
+ kahaDBPersistenceAdapter.setCleanupInterval(500);
+ broker.setPersistenceAdapter(kahaDBPersistenceAdapter);
broker.start();
LOG.info("Starting broker..");