Author: chirino
Date: Tue Apr  1 12:35:48 2008
New Revision: 643529

URL: http://svn.apache.org/viewvc?rev=643529&view=rev
Log:
- The ack to the message store was being sent after the transaction commit.  
Not good.  Fixed so that the ack gets sent to the message store
  as it comes in.  This fixes teh failing jpa tests.


Modified:
    activemq/trunk/activemq-console/src/test/resources/activemq.xml
    activemq/trunk/activemq-console/src/test/resources/log4j.properties
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
    
activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java
    
activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreRecoveryBrokerTest.java
    
activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreXARecoveryBrokerTest.java
    
activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/jpabroker.xml
    
activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/quickjpabroker.xml

Modified: activemq/trunk/activemq-console/src/test/resources/activemq.xml
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/test/resources/activemq.xml?rev=643529&r1=643528&r2=643529&view=diff
==============================================================================
--- activemq/trunk/activemq-console/src/test/resources/activemq.xml (original)
+++ activemq/trunk/activemq-console/src/test/resources/activemq.xml Tue Apr  1 
12:35:48 2008
@@ -21,35 +21,85 @@
   <bean 
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
   
   <!-- Default configuration -->
-  <broker useJmx="false" xmlns="http://activemq.org/config/1.0";>
+  <broker brokerName="broker1" useJmx="true" persistent="false" 
xmlns="http://activemq.org/config/1.0"; useShutdownHook="false" 
monitorConnectionSplits="true">
          
-       <transportConnectors>
-      <transportConnector uri="tcp://localhost:61616"/>
-    </transportConnectors>
-         
-       <persistenceAdapter>
-         <journaledJDBC journalLogFiles="2" dataDirectory="target/foo"/>
-       </persistenceAdapter>
-         
-  </broker>
+        <managementContext>
+            <managementContext createConnector="false"/>
+        </managementContext>
 
-  <!-- Example of broker configuration that uses new logging options and 
dynamic management of logging
-  <broker useJmx="true" xmlns="http://activemq.org/config/1.0"; 
persistent="false" deleteAllMessagesOnStartup="true">
-  
-    <transportConnectors>
-      <transportConnector 
uri="tcp://localhost:61616?trace=true&amp;logWriterName=custom&amp;dynamicManagement=true&amp;startLogging=true"/>
-    </transportConnectors>
-    
-    <persistenceAdapter>
-      <memoryPersistenceAdapter/>
-    </persistenceAdapter>
-    
+        <networkConnectors>
+            <networkConnector 
uri="static:(tcp://localhost:61626?socketBufferSize=256000)" userName="foo" 
password="bar" dynamicOnly="false" decreaseNetworkConsumerPriority="true">
+                               <excludedDestinations>
+                                       <topic physicalName="bbm.batch.1"/>
+                                       <topic 
physicalName="intl.service.status"/>
+                               </excludedDestinations>
+                       </networkConnector>
+               </networkConnectors>
+
+               <transportConnectors>
+                       <transportConnector 
uri="tcp://localhost:61616?socketBufferSize=256000"/>
+                       <transportConnector 
uri="tcp://localhost:61618?socketBufferSize=256000"/>
+               </transportConnectors>
+
+               <destinationPolicy>
+                       <policyMap>
+                               <policyEntries>
+                                       <policyEntry queue=">" 
minimumMessageSize="1" optimizedDispatch="true" lazyDispatch="false" 
producerFlowControl="false">
+                                               <deadLetterStrategy>
+                                                       
<sharedDeadLetterStrategy processExpired="false" processNonPersistent="false"/>
+                                               </deadLetterStrategy>
+                                       </policyEntry>
+                                       <policyEntry topic=">" 
minimumMessageSize="1" optimizedDispatch="true" lazyDispatch="false" 
producerFlowControl="false">
+                                               <deadLetterStrategy>
+                                                       
<sharedDeadLetterStrategy processExpired="false" processNonPersistent="false"/>
+                                               </deadLetterStrategy>
+                                       </policyEntry>
+                                       
+                                       <policyEntry topic="intl.bbm.batch.>" 
minimumMessageSize="1" optimizedDispatch="true" lazyDispatch="false" 
producerFlowControl="false">
+                                               <deadLetterStrategy>
+                                                       
<sharedDeadLetterStrategy processExpired="false" processNonPersistent="false"/>
+                                               </deadLetterStrategy>
+                                               <!-- 
+                                               <pendingMessageLimitStrategy>
+                                                       
<constantPendingMessageLimitStrategy limit="${activemq.pending.message.limit}"/>
+                                               </pendingMessageLimitStrategy>
+                                                -->
+                                               <subscriptionRecoveryPolicy>
+                                                       
<fixedCountSubscriptionRecoveryPolicy maximumSize="10"/>
+                                               </subscriptionRecoveryPolicy>
+                                       </policyEntry>
+                                       <policyEntry topic="bbm.batch.>" 
minimumMessageSize="1" optimizedDispatch="true" lazyDispatch="false" 
producerFlowControl="false">
+                                               <deadLetterStrategy>
+                                                       
<sharedDeadLetterStrategy processExpired="false" processNonPersistent="false"/>
+                                               </deadLetterStrategy>
+                                               <!-- 
+                                               <pendingMessageLimitStrategy>
+                                                       
<constantPendingMessageLimitStrategy limit="${activemq.pending.message.limit}"/>
+                                               </pendingMessageLimitStrategy>
+                                                -->
+                                               <subscriptionRecoveryPolicy>
+                                                       
<fixedCountSubscriptionRecoveryPolicy maximumSize="10"/>
+                                               </subscriptionRecoveryPolicy>
+                                       </policyEntry>
+                                       <policyEntry 
topic="intl.service.status" minimumMessageSize="1" optimizedDispatch="true" 
lazyDispatch="false" producerFlowControl="false"> 
+                                               <subscriptionRecoveryPolicy>
+                                                       
<lastImageSubscriptionRecoveryPolicy/>
+                                               </subscriptionRecoveryPolicy>
+                                       </policyEntry>
+                               </policyEntries>
+                       </policyMap>
+               </destinationPolicy>
+               
+               <systemUsage>
+                       <systemUsage>
+                               <memoryUsage>
+                                       <memoryUsage limit="1gb"/>
+                               </memoryUsage>
+                       </systemUsage>  
+               </systemUsage>
+       
+         
   </broker>
-  End of example-->
-  
-<!-- Note: the jmxPort=portnumber option on transportConnectors should only be 
used on clients.
-On brokers, there is a default port (usually 1099) -->
-  
   
 </beans>
 <!-- END SNIPPET: xbean -->

Modified: activemq/trunk/activemq-console/src/test/resources/log4j.properties
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/test/resources/log4j.properties?rev=643529&r1=643528&r2=643529&view=diff
==============================================================================
--- activemq/trunk/activemq-console/src/test/resources/log4j.properties 
(original)
+++ activemq/trunk/activemq-console/src/test/resources/log4j.properties Tue Apr 
 1 12:35:48 2008
@@ -18,7 +18,7 @@
 #
 # The logging properties used during tests..
 #
-log4j.rootLogger=DEBUG, stdout
+log4j.rootLogger=INFO, stdout
 log4j.logger.org.apache.activemq.spring=WARN
 
 # CONSOLE appender, not used by default

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=643529&r1=643528&r2=643529&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 Tue Apr  1 12:35:48 2008
@@ -996,14 +996,36 @@
         removeMessage(c, null, r, ack);
     }
     
-    protected void removeMessage(ConnectionContext context,Subscription 
sub,QueueMessageReference reference,MessageAck ack) throws IOException {
-        reference.drop();
+    protected void removeMessage(ConnectionContext context,Subscription 
sub,final QueueMessageReference reference,MessageAck ack) throws IOException {
+        reference.setAcked(true);
+        // This sends the ack the the journal..
         acknowledge(context, sub, ack, reference);
-        destinationStatistics.getMessages().decrement();
-        synchronized(pagedInMessages) {
-            pagedInMessages.remove(reference.getMessageId());
+
+        if (!ack.isInTransaction()) {
+            reference.drop();
+            destinationStatistics.getMessages().decrement();
+            synchronized(pagedInMessages) {
+                pagedInMessages.remove(reference.getMessageId());
+            }
+            wakeup();
+        } else {
+            context.getTransaction().addSynchronization(new Synchronization() {
+                
+                public void afterCommit() throws Exception {
+                    reference.drop();
+                    destinationStatistics.getMessages().decrement();
+                    synchronized(pagedInMessages) {
+                        pagedInMessages.remove(reference.getMessageId());
+                    }
+                    wakeup();
+                }
+                
+                public void afterRollback() throws Exception {
+                    reference.setAcked(false);
+                }
+            });
         }
-        wakeup();
+
     }
     
     public void messageExpired(ConnectionContext context, PrefetchSubscription 
prefetchSubscription, MessageReference reference) {

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=643529&r1=643528&r2=643529&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
 Tue Apr  1 12:35:48 2008
@@ -51,20 +51,7 @@
         final Destination q = n.getRegionDestination();
         final QueueMessageReference node = (QueueMessageReference)n;
         final Queue queue = (Queue)q;
-        if (!ack.isInTransaction()) {
-            queue.removeMessage(context, this, node, ack);
-        } else {
-            node.setAcked(true);
-            context.getTransaction().addSynchronization(new Synchronization() {
-                public void afterCommit() throws Exception {
-                    queue.removeMessage(context, QueueSubscription.this, node, 
ack);
-                }
-
-                public void afterRollback() throws Exception {
-                    node.setAcked(false);
-                }
-            });
-        }
+        queue.removeMessage(context, this, node, ack);
     }
 
     protected boolean canDispatch(MessageReference n) throws IOException {

Modified: 
activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java?rev=643529&r1=643528&r2=643529&view=diff
==============================================================================
--- 
activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java
 (original)
+++ 
activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java
 Tue Apr  1 12:35:48 2008
@@ -39,7 +39,7 @@
         props.setProperty("openjpa.ConnectionDriverName", 
"org.apache.derby.jdbc.EmbeddedDriver");
         props.setProperty("openjpa.ConnectionURL", 
"jdbc:derby:activemq-data/derby;create=true");
         props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
-        props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
+//        props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
         pa.setEntityManagerProperties(props);
         service.setPersistenceAdapter(pa);
         return service;
@@ -53,7 +53,7 @@
         props.setProperty("openjpa.ConnectionDriverName", 
"org.apache.derby.jdbc.EmbeddedDriver");
         props.setProperty("openjpa.ConnectionURL", 
"jdbc:derby:activemq-data/derby;create=true");
         props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
-        props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
+//        props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
         pa.setEntityManagerProperties(props);
         service.setPersistenceAdapter(pa);
         return service;

Modified: 
activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreRecoveryBrokerTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreRecoveryBrokerTest.java?rev=643529&r1=643528&r2=643529&view=diff
==============================================================================
--- 
activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreRecoveryBrokerTest.java
 (original)
+++ 
activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreRecoveryBrokerTest.java
 Tue Apr  1 12:35:48 2008
@@ -40,7 +40,7 @@
         props.setProperty("openjpa.ConnectionDriverName", 
"org.apache.derby.jdbc.EmbeddedDriver");
         props.setProperty("openjpa.ConnectionURL", 
"jdbc:derby:activemq-data/derby;create=true");
         props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
-        props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
+//        props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
         rfa.setEntityManagerProperties(props);
         pa.setReferenceStoreAdapter(rfa);        
         
@@ -57,7 +57,7 @@
         props.setProperty("openjpa.ConnectionDriverName", 
"org.apache.derby.jdbc.EmbeddedDriver");
         props.setProperty("openjpa.ConnectionURL", 
"jdbc:derby:activemq-data/derby;create=true");
         props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
-        props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
+//        props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
         rfa.setEntityManagerProperties(props);
         pa.setReferenceStoreAdapter(rfa);        
 

Modified: 
activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreXARecoveryBrokerTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreXARecoveryBrokerTest.java?rev=643529&r1=643528&r2=643529&view=diff
==============================================================================
--- 
activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreXARecoveryBrokerTest.java
 (original)
+++ 
activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreXARecoveryBrokerTest.java
 Tue Apr  1 12:35:48 2008
@@ -48,7 +48,7 @@
         props.setProperty("openjpa.ConnectionDriverName", 
"org.apache.derby.jdbc.EmbeddedDriver");
         props.setProperty("openjpa.ConnectionURL", 
"jdbc:derby:activemq-data/derby;create=true");
         props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
-        props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
+//        props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
         rfa.setEntityManagerProperties(props);
         pa.setReferenceStoreAdapter(rfa);        
         
@@ -65,7 +65,7 @@
         props.setProperty("openjpa.ConnectionDriverName", 
"org.apache.derby.jdbc.EmbeddedDriver");
         props.setProperty("openjpa.ConnectionURL", 
"jdbc:derby:activemq-data/derby;create=true");
         props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
-        props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
+//        props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
         rfa.setEntityManagerProperties(props);
         pa.setReferenceStoreAdapter(rfa);        
         

Modified: 
activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/jpabroker.xml
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/jpabroker.xml?rev=643529&r1=643528&r2=643529&view=diff
==============================================================================
--- 
activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/jpabroker.xml
 (original)
+++ 
activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/jpabroker.xml
 Tue Apr  1 12:35:48 2008
@@ -36,7 +36,7 @@
                        <prop 
key="openjpa.ConnectionDriverName">org.apache.derby.jdbc.EmbeddedDriver</prop>
                <prop 
key="openjpa.ConnectionURL">jdbc:derby:activemq-data/derby;create=true</prop>
                <prop key="openjpa.jdbc.SynchronizeMappings">buildSchema</prop>
-                       <prop 
key="openjpa.Log=DefaultLevel">WARN,SQL=TRACE</prop>
+                       <!--  <prop 
key="openjpa.Log=DefaultLevel">WARN,SQL=TRACE</prop>  --> 
        </props>
        </property>     
   </bean>

Modified: 
activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/quickjpabroker.xml
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/quickjpabroker.xml?rev=643529&r1=643528&r2=643529&view=diff
==============================================================================
--- 
activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/quickjpabroker.xml
 (original)
+++ 
activemq/trunk/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/quickjpabroker.xml
 Tue Apr  1 12:35:48 2008
@@ -39,7 +39,7 @@
                        <prop 
key="openjpa.ConnectionDriverName">org.apache.derby.jdbc.EmbeddedDriver</prop>
                <prop 
key="openjpa.ConnectionURL">jdbc:derby:activemq-data/derby;create=true</prop>
                <prop key="openjpa.jdbc.SynchronizeMappings">buildSchema</prop>
-                       <prop 
key="openjpa.Log=DefaultLevel">WARN,SQL=TRACE</prop>
+                       <!--  <prop 
key="openjpa.Log=DefaultLevel">WARN,SQL=TRACE</prop>  -->
        </props>
        </property>     
   </bean>


Reply via email to