Author: chirino
Date: Tue Apr 1 12:35:48 2008
New Revision: 643529
URL: http://svn.apache.org/viewvc?rev=643529&view=rev
Log:
- The ack to the message store was being sent after the transaction commit.
Not good. Fixed so that the ack gets sent to the message store
as it comes in. This fixes teh failing jpa tests.
Modified:
activemq/trunk/activemq-console/src/test/resources/activemq.xml
activemq/trunk/activemq-console/src/test/resources/log4j.properties
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java
activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreRecoveryBrokerTest.java
activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreXARecoveryBrokerTest.java
activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/jpabroker.xml
activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/quickjpabroker.xml
Modified: activemq/trunk/activemq-console/src/test/resources/activemq.xml
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/test/resources/activemq.xml?rev=643529&r1=643528&r2=643529&view=diff
==============================================================================
--- activemq/trunk/activemq-console/src/test/resources/activemq.xml (original)
+++ activemq/trunk/activemq-console/src/test/resources/activemq.xml Tue Apr 1
12:35:48 2008
@@ -21,35 +21,85 @@
<bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<!-- Default configuration -->
- <broker useJmx="false" xmlns="http://activemq.org/config/1.0">
+ <broker brokerName="broker1" useJmx="true" persistent="false"
xmlns="http://activemq.org/config/1.0" useShutdownHook="false"
monitorConnectionSplits="true">
- <transportConnectors>
- <transportConnector uri="tcp://localhost:61616"/>
- </transportConnectors>
-
- <persistenceAdapter>
- <journaledJDBC journalLogFiles="2" dataDirectory="target/foo"/>
- </persistenceAdapter>
-
- </broker>
+ <managementContext>
+ <managementContext createConnector="false"/>
+ </managementContext>
- <!-- Example of broker configuration that uses new logging options and
dynamic management of logging
- <broker useJmx="true" xmlns="http://activemq.org/config/1.0"
persistent="false" deleteAllMessagesOnStartup="true">
-
- <transportConnectors>
- <transportConnector
uri="tcp://localhost:61616?trace=true&logWriterName=custom&dynamicManagement=true&startLogging=true"/>
- </transportConnectors>
-
- <persistenceAdapter>
- <memoryPersistenceAdapter/>
- </persistenceAdapter>
-
+ <networkConnectors>
+ <networkConnector
uri="static:(tcp://localhost:61626?socketBufferSize=256000)" userName="foo"
password="bar" dynamicOnly="false" decreaseNetworkConsumerPriority="true">
+ <excludedDestinations>
+ <topic physicalName="bbm.batch.1"/>
+ <topic
physicalName="intl.service.status"/>
+ </excludedDestinations>
+ </networkConnector>
+ </networkConnectors>
+
+ <transportConnectors>
+ <transportConnector
uri="tcp://localhost:61616?socketBufferSize=256000"/>
+ <transportConnector
uri="tcp://localhost:61618?socketBufferSize=256000"/>
+ </transportConnectors>
+
+ <destinationPolicy>
+ <policyMap>
+ <policyEntries>
+ <policyEntry queue=">"
minimumMessageSize="1" optimizedDispatch="true" lazyDispatch="false"
producerFlowControl="false">
+ <deadLetterStrategy>
+
<sharedDeadLetterStrategy processExpired="false" processNonPersistent="false"/>
+ </deadLetterStrategy>
+ </policyEntry>
+ <policyEntry topic=">"
minimumMessageSize="1" optimizedDispatch="true" lazyDispatch="false"
producerFlowControl="false">
+ <deadLetterStrategy>
+
<sharedDeadLetterStrategy processExpired="false" processNonPersistent="false"/>
+ </deadLetterStrategy>
+ </policyEntry>
+
+ <policyEntry topic="intl.bbm.batch.>"
minimumMessageSize="1" optimizedDispatch="true" lazyDispatch="false"
producerFlowControl="false">
+ <deadLetterStrategy>
+
<sharedDeadLetterStrategy processExpired="false" processNonPersistent="false"/>
+ </deadLetterStrategy>
+ <!--
+ <pendingMessageLimitStrategy>
+
<constantPendingMessageLimitStrategy limit="${activemq.pending.message.limit}"/>
+ </pendingMessageLimitStrategy>
+ -->
+ <subscriptionRecoveryPolicy>
+
<fixedCountSubscriptionRecoveryPolicy maximumSize="10"/>
+ </subscriptionRecoveryPolicy>
+ </policyEntry>
+ <policyEntry topic="bbm.batch.>"
minimumMessageSize="1" optimizedDispatch="true" lazyDispatch="false"
producerFlowControl="false">
+ <deadLetterStrategy>
+
<sharedDeadLetterStrategy processExpired="false" processNonPersistent="false"/>
+ </deadLetterStrategy>
+ <!--
+ <pendingMessageLimitStrategy>
+
<constantPendingMessageLimitStrategy limit="${activemq.pending.message.limit}"/>
+ </pendingMessageLimitStrategy>
+ -->
+ <subscriptionRecoveryPolicy>
+
<fixedCountSubscriptionRecoveryPolicy maximumSize="10"/>
+ </subscriptionRecoveryPolicy>
+ </policyEntry>
+ <policyEntry
topic="intl.service.status" minimumMessageSize="1" optimizedDispatch="true"
lazyDispatch="false" producerFlowControl="false">
+ <subscriptionRecoveryPolicy>
+
<lastImageSubscriptionRecoveryPolicy/>
+ </subscriptionRecoveryPolicy>
+ </policyEntry>
+ </policyEntries>
+ </policyMap>
+ </destinationPolicy>
+
+ <systemUsage>
+ <systemUsage>
+ <memoryUsage>
+ <memoryUsage limit="1gb"/>
+ </memoryUsage>
+ </systemUsage>
+ </systemUsage>
+
+
</broker>
- End of example-->
-
-<!-- Note: the jmxPort=portnumber option on transportConnectors should only be
used on clients.
-On brokers, there is a default port (usually 1099) -->
-
</beans>
<!-- END SNIPPET: xbean -->
Modified: activemq/trunk/activemq-console/src/test/resources/log4j.properties
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/test/resources/log4j.properties?rev=643529&r1=643528&r2=643529&view=diff
==============================================================================
--- activemq/trunk/activemq-console/src/test/resources/log4j.properties
(original)
+++ activemq/trunk/activemq-console/src/test/resources/log4j.properties Tue Apr
1 12:35:48 2008
@@ -18,7 +18,7 @@
#
# The logging properties used during tests..
#
-log4j.rootLogger=DEBUG, stdout
+log4j.rootLogger=INFO, stdout
log4j.logger.org.apache.activemq.spring=WARN
# CONSOLE appender, not used by default
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=643529&r1=643528&r2=643529&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Tue Apr 1 12:35:48 2008
@@ -996,14 +996,36 @@
removeMessage(c, null, r, ack);
}
- protected void removeMessage(ConnectionContext context,Subscription
sub,QueueMessageReference reference,MessageAck ack) throws IOException {
- reference.drop();
+ protected void removeMessage(ConnectionContext context,Subscription
sub,final QueueMessageReference reference,MessageAck ack) throws IOException {
+ reference.setAcked(true);
+ // This sends the ack the the journal..
acknowledge(context, sub, ack, reference);
- destinationStatistics.getMessages().decrement();
- synchronized(pagedInMessages) {
- pagedInMessages.remove(reference.getMessageId());
+
+ if (!ack.isInTransaction()) {
+ reference.drop();
+ destinationStatistics.getMessages().decrement();
+ synchronized(pagedInMessages) {
+ pagedInMessages.remove(reference.getMessageId());
+ }
+ wakeup();
+ } else {
+ context.getTransaction().addSynchronization(new Synchronization() {
+
+ public void afterCommit() throws Exception {
+ reference.drop();
+ destinationStatistics.getMessages().decrement();
+ synchronized(pagedInMessages) {
+ pagedInMessages.remove(reference.getMessageId());
+ }
+ wakeup();
+ }
+
+ public void afterRollback() throws Exception {
+ reference.setAcked(false);
+ }
+ });
}
- wakeup();
+
}
public void messageExpired(ConnectionContext context, PrefetchSubscription
prefetchSubscription, MessageReference reference) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=643529&r1=643528&r2=643529&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
Tue Apr 1 12:35:48 2008
@@ -51,20 +51,7 @@
final Destination q = n.getRegionDestination();
final QueueMessageReference node = (QueueMessageReference)n;
final Queue queue = (Queue)q;
- if (!ack.isInTransaction()) {
- queue.removeMessage(context, this, node, ack);
- } else {
- node.setAcked(true);
- context.getTransaction().addSynchronization(new Synchronization() {
- public void afterCommit() throws Exception {
- queue.removeMessage(context, QueueSubscription.this, node,
ack);
- }
-
- public void afterRollback() throws Exception {
- node.setAcked(false);
- }
- });
- }
+ queue.removeMessage(context, this, node, ack);
}
protected boolean canDispatch(MessageReference n) throws IOException {
Modified:
activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java?rev=643529&r1=643528&r2=643529&view=diff
==============================================================================
---
activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java
(original)
+++
activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java
Tue Apr 1 12:35:48 2008
@@ -39,7 +39,7 @@
props.setProperty("openjpa.ConnectionDriverName",
"org.apache.derby.jdbc.EmbeddedDriver");
props.setProperty("openjpa.ConnectionURL",
"jdbc:derby:activemq-data/derby;create=true");
props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
- props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
+// props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
pa.setEntityManagerProperties(props);
service.setPersistenceAdapter(pa);
return service;
@@ -53,7 +53,7 @@
props.setProperty("openjpa.ConnectionDriverName",
"org.apache.derby.jdbc.EmbeddedDriver");
props.setProperty("openjpa.ConnectionURL",
"jdbc:derby:activemq-data/derby;create=true");
props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
- props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
+// props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
pa.setEntityManagerProperties(props);
service.setPersistenceAdapter(pa);
return service;
Modified:
activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreRecoveryBrokerTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreRecoveryBrokerTest.java?rev=643529&r1=643528&r2=643529&view=diff
==============================================================================
---
activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreRecoveryBrokerTest.java
(original)
+++
activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreRecoveryBrokerTest.java
Tue Apr 1 12:35:48 2008
@@ -40,7 +40,7 @@
props.setProperty("openjpa.ConnectionDriverName",
"org.apache.derby.jdbc.EmbeddedDriver");
props.setProperty("openjpa.ConnectionURL",
"jdbc:derby:activemq-data/derby;create=true");
props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
- props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
+// props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
rfa.setEntityManagerProperties(props);
pa.setReferenceStoreAdapter(rfa);
@@ -57,7 +57,7 @@
props.setProperty("openjpa.ConnectionDriverName",
"org.apache.derby.jdbc.EmbeddedDriver");
props.setProperty("openjpa.ConnectionURL",
"jdbc:derby:activemq-data/derby;create=true");
props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
- props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
+// props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
rfa.setEntityManagerProperties(props);
pa.setReferenceStoreAdapter(rfa);
Modified:
activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreXARecoveryBrokerTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreXARecoveryBrokerTest.java?rev=643529&r1=643528&r2=643529&view=diff
==============================================================================
---
activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreXARecoveryBrokerTest.java
(original)
+++
activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreXARecoveryBrokerTest.java
Tue Apr 1 12:35:48 2008
@@ -48,7 +48,7 @@
props.setProperty("openjpa.ConnectionDriverName",
"org.apache.derby.jdbc.EmbeddedDriver");
props.setProperty("openjpa.ConnectionURL",
"jdbc:derby:activemq-data/derby;create=true");
props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
- props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
+// props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
rfa.setEntityManagerProperties(props);
pa.setReferenceStoreAdapter(rfa);
@@ -65,7 +65,7 @@
props.setProperty("openjpa.ConnectionDriverName",
"org.apache.derby.jdbc.EmbeddedDriver");
props.setProperty("openjpa.ConnectionURL",
"jdbc:derby:activemq-data/derby;create=true");
props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
- props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
+// props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
rfa.setEntityManagerProperties(props);
pa.setReferenceStoreAdapter(rfa);
Modified:
activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/jpabroker.xml
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/jpabroker.xml?rev=643529&r1=643528&r2=643529&view=diff
==============================================================================
---
activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/jpabroker.xml
(original)
+++
activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/jpabroker.xml
Tue Apr 1 12:35:48 2008
@@ -36,7 +36,7 @@
<prop
key="openjpa.ConnectionDriverName">org.apache.derby.jdbc.EmbeddedDriver</prop>
<prop
key="openjpa.ConnectionURL">jdbc:derby:activemq-data/derby;create=true</prop>
<prop key="openjpa.jdbc.SynchronizeMappings">buildSchema</prop>
- <prop
key="openjpa.Log=DefaultLevel">WARN,SQL=TRACE</prop>
+ <!-- <prop
key="openjpa.Log=DefaultLevel">WARN,SQL=TRACE</prop> -->
</props>
</property>
</bean>
Modified:
activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/quickjpabroker.xml
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/quickjpabroker.xml?rev=643529&r1=643528&r2=643529&view=diff
==============================================================================
---
activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/quickjpabroker.xml
(original)
+++
activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/quickjpabroker.xml
Tue Apr 1 12:35:48 2008
@@ -39,7 +39,7 @@
<prop
key="openjpa.ConnectionDriverName">org.apache.derby.jdbc.EmbeddedDriver</prop>
<prop
key="openjpa.ConnectionURL">jdbc:derby:activemq-data/derby;create=true</prop>
<prop key="openjpa.jdbc.SynchronizeMappings">buildSchema</prop>
- <prop
key="openjpa.Log=DefaultLevel">WARN,SQL=TRACE</prop>
+ <!-- <prop
key="openjpa.Log=DefaultLevel">WARN,SQL=TRACE</prop> -->
</props>
</property>
</bean>