Author: rajdavies
Date: Wed Sep 10 11:01:46 2008
New Revision: 693915

URL: http://svn.apache.org/viewvc?rev=693915&view=rev
Log:
Applied patch for https://issues.apache.org/activemq/browse/AMQ-1925

Added:
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
   (with props)
Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=693915&r1=693914&r2=693915&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
 Wed Sep 10 11:01:46 2008
@@ -40,6 +40,7 @@
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.management.JMSConsumerStatsImpl;
 import org.apache.activemq.management.StatsCapable;
@@ -607,14 +608,13 @@
         MessageAck ack = null;
         if (deliveryingAcknowledgements.compareAndSet(false, true)) {
             if (this.optimizeAcknowledge) {
-                synchronized(deliveredMessages) {
-                    if (!deliveredMessages.isEmpty()) {
-                        MessageDispatch md = deliveredMessages.getFirst();
-                        ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 
deliveredMessages.size());
-                        deliveredMessages.clear();
-                        ackCounter = 0;
-                    }
-                }
+               synchronized(deliveredMessages) {
+                       ack = 
makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
+                       if (ack != null) {
+                               deliveredMessages.clear();
+                               ackCounter = 0;
+                       }
+               }
             }
             if (ack != null) {
                 final MessageAck ackToSend = ack;
@@ -756,17 +756,21 @@
                                 ackCounter++;
                                 if (ackCounter >= (info
                                         .getCurrentPrefetchSize() * .65)) {
-                                    MessageAck ack = new 
MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
-                                    session.sendAck(ack);
-                                    ackCounter = 0;
-                                    deliveredMessages.clear();
+                                       MessageAck ack = 
makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
+                                       if (ack != null) {
+                                           deliveredMessages.clear();
+                                           ackCounter = 0;
+                                           session.sendAck(ack);
+                                       }
                                 }
                                 deliveryingAcknowledgements.set(false);
                             }
                         } else {
-                            MessageAck ack = new 
MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
-                            session.sendAck(ack);
-                            deliveredMessages.clear();
+                            MessageAck ack = 
makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
+                            if (ack!=null) {
+                               deliveredMessages.clear();
+                               session.sendAck(ack);
+                            }
                         }
                     }
                 }
@@ -781,6 +785,25 @@
         }
     }
 
+    /**
+     * Creates a MessageAck for all messages contained in deliveredMessages.
+     * Caller should hold the lock for deliveredMessages.
+     * 
+     * @param type Ack-Type (i.e. MessageAck.STANDARD_ACK_TYPE) 
+     * @return <code>null</code> if nothing to ack.
+     */
+       private MessageAck makeAckForAllDeliveredMessages(byte type) {
+               synchronized (deliveredMessages) {
+                       if (deliveredMessages.isEmpty())
+                               return null;
+                           
+                       MessageDispatch md = deliveredMessages.getFirst();
+                   MessageAck ack = new MessageAck(md, type, 
deliveredMessages.size());
+                   
ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId());
+                   return ack;
+               }
+       }
+
     private void ackLater(MessageDispatch md, byte ackType) throws 
JMSException {
 
         // Don't acknowledge now, but we may need to let the broker know the
@@ -814,6 +837,7 @@
         deliveredCounter++;
         if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - 
additionalWindowSize)) {
             MessageAck ack = new MessageAck(md, ackType, deliveredCounter);
+            
ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId());
             
ack.setTransactionId(session.getTransactionContext().getTransactionId());
             session.sendAck(ack);
             additionalWindowSize = deliveredCounter;
@@ -834,13 +858,11 @@
      */
     public void acknowledge() throws JMSException {
         synchronized(deliveredMessages) {
-            if (deliveredMessages.isEmpty()) {
-                return;
-            }
-    
-            // Acknowledge the last message.
-            MessageDispatch lastMd = deliveredMessages.get(0);
-            MessageAck ack = new MessageAck(lastMd, 
MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size());
+            // Acknowledge all messages so far.
+            MessageAck ack = 
makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
+            if (ack == null)
+               return; // no msgs
+            
             if (session.isTransacted()) {
                 session.doStartTransaction();
                 
ack.setTransactionId(session.getTransactionContext().getTransactionId());
@@ -897,6 +919,7 @@
                 if (lastMd.getMessage().getRedeliveryCounter() > 0) {
                     redeliveryDelay = 
redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
                 }
+                MessageId firstMsgId = 
deliveredMessages.getLast().getMessage().getMessageId();
     
                 for (Iterator iter = deliveredMessages.iterator(); 
iter.hasNext();) {
                     MessageDispatch md = (MessageDispatch)iter.next();
@@ -910,6 +933,7 @@
                     // Acknowledge the last message.
                     
                     MessageAck ack = new MessageAck(lastMd, 
MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
+                                       ack.setFirstMessageId(firstMsgId);
                     session.sendAck(ack,true);
                     // ensure we don't filter this as a duplicate
                     session.connection.rollbackDuplicate(this, 
lastMd.getMessage());
@@ -919,6 +943,7 @@
                 } else {
                     
                     MessageAck ack = new MessageAck(lastMd, 
MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
+                    ack.setFirstMessageId(firstMsgId);
                     session.sendAck(ack,true);
     
                     // stop the delivery of messages.

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=693915&r1=693914&r2=693915&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 Wed Sep 10 11:01:46 2008
@@ -180,9 +180,12 @@
         Destination destination = null;
         synchronized(dispatchLock) {
             if (ack.isStandardAck()) {
+               // First check if the ack matches the dispatched. When using 
failover this might
+               // not be the case. We don't ever want to ack the wrong 
messages.
+               assertAckMatchesDispatched(ack);
+               
                 // Acknowledge all dispatched messages up till the message id 
of
-                // the
-                // acknowledgment.
+                // the acknowledgment.
                 int index = 0;
                 boolean inAckRange = false;
                 List<MessageReference> removeList = new 
ArrayList<MessageReference>();
@@ -263,11 +266,8 @@
                 // this only happens after a reconnect - get an ack which is 
not
                 // valid
                 if (!callDispatchMatched) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG
-                                .debug("Could not correlate acknowledgment 
with dispatched message: "
-                                        + ack);
-                    }
+                        LOG.error("Could not correlate acknowledgment with 
dispatched message: "
+                                  + ack);
                 }
             } else if (ack.isIndividualAck()) {
                 // Message was delivered and acknowledge - but only delete the
@@ -410,6 +410,45 @@
     }
 
     /**
+     * Checks an ack versus the contents of the dispatched list.
+     * 
+     * @param ack
+     * @param firstAckedMsg
+     * @param lastAckedMsg
+     * @throws JMSException if it does not match
+     */
+       protected void assertAckMatchesDispatched(MessageAck ack)
+                       throws JMSException {
+        MessageId firstAckedMsg = ack.getFirstMessageId();
+               MessageId lastAckedMsg = ack.getLastMessageId();
+
+               int checkCount = 0;
+               boolean checkFoundStart = false;
+               boolean checkFoundEnd = false;
+               for (MessageReference node : dispatched) {
+                       if (!checkFoundStart && firstAckedMsg != null && 
firstAckedMsg.equals(node.getMessageId())) {
+                               checkFoundStart = true;
+                       }
+
+                       if (checkFoundStart || firstAckedMsg == null)
+                               checkCount++;
+
+                       if (lastAckedMsg != null && 
lastAckedMsg.equals(node.getMessageId())) {
+                               checkFoundEnd = true;
+                               break;
+                       }
+               }
+               if (!checkFoundStart && firstAckedMsg != null)
+                       throw new JMSException("Unmatched acknowledege: Could 
not find Message-ID "+firstAckedMsg+" in dispatched-list (start of ack)");
+               if (!checkFoundEnd && lastAckedMsg != null)
+                       throw new JMSException("Unmatched acknowledege: Could 
not find Message-ID "+firstAckedMsg+" in dispatched-list (end of ack)");
+               if (ack.getMessageCount() != checkCount) {
+                       throw new JMSException("Unmatched acknowledege: 
Expected message count ("+ack.getMessageCount()+
+                                       ") differs from count in 
dispatched-list ("+checkCount+")");
+               }
+       }
+
+    /**
      * @param context
      * @param node
      * @throws IOException
@@ -429,7 +468,7 @@
      * @return
      */
     public boolean isFull() {
-        return dispatched.size() - prefetchExtension >= info.getPrefetchSize();
+        return isSlave() || dispatched.size() - prefetchExtension >= 
info.getPrefetchSize();
     }
 
     /**

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java?rev=693915&r1=693914&r2=693915&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java
 Wed Sep 10 11:01:46 2008
@@ -19,6 +19,7 @@
 import java.io.File;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+import org.apache.kahadb.store.KahaDBPersistenceAdaptor;
 
 /**
  * @version $Revision: 1.3 $
@@ -29,9 +30,14 @@
         File dataFileDir = new File("target/test-amq-data/perfTest/amqdb");
         dataFileDir.mkdirs();
         answer.setDeleteAllMessagesOnStartup(true);
-        AMQPersistenceAdapter adaptor = new AMQPersistenceAdapter();
-        adaptor.setArchiveDataLogs(true);
+        //AMQPersistenceAdapter adaptor = new AMQPersistenceAdapter();
+        //adaptor.setArchiveDataLogs(true);
         //adaptor.setMaxFileLength(1024 * 64);
+        
+         KahaDBPersistenceAdaptor adaptor = new KahaDBPersistenceAdaptor();
+        //adaptor.setDirectory(dataFileDir);
+         
+        
         answer.setDataDirectoryFile(dataFileDir);
         answer.setPersistenceAdapter(adaptor);
         answer.addConnector(uri);

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java?rev=693915&r1=693914&r2=693915&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java
 Wed Sep 10 11:01:46 2008
@@ -31,8 +31,8 @@
     
     protected void setUp() throws Exception {
         numberOfDestinations=1;
-        numberOfConsumers = 4;
-        numberofProducers = 1;
+        numberOfConsumers = 2;
+        numberofProducers = 2;
         sampleCount=1000;
         playloadSize = 1024;
         super.setUp();
@@ -41,6 +41,8 @@
     protected void configureBroker(BrokerService answer,String uri) throws 
Exception {
         AMQPersistenceAdapterFactory persistenceFactory = new 
AMQPersistenceAdapterFactory();
         persistenceFactory.setMaxFileLength(1024*16);
+        persistenceFactory.setPersistentIndex(true);
+        persistenceFactory.setCleanupInterval(10000);
         answer.setPersistenceFactory(persistenceFactory);
         answer.setDeleteAllMessagesOnStartup(true);
         answer.addConnector(uri);
@@ -55,7 +57,7 @@
 
     protected PerfConsumer createConsumer(ConnectionFactory fac, Destination 
dest, int number) throws JMSException {
         PerfConsumer result = new PerfConsumer(fac, dest, "subs:" + number);
-        result.setInitialDelay(2000);
+        result.setInitialDelay(0);
         return result;
     }
     

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java?rev=693915&r1=693914&r2=693915&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java
 Wed Sep 10 11:01:46 2008
@@ -31,12 +31,14 @@
     }
     
     protected void setUp() throws Exception {
-      
+        numberOfConsumers = 1;
         super.setUp();
     }
     
     protected PerfConsumer createConsumer(ConnectionFactory fac, Destination 
dest, int number) throws JMSException {
         PerfConsumer consumer =  new PerfConsumer(fac, dest);
+        //consumer.setInitialDelay(2000);
+        //consumer.setSleepDuration(10);
         boolean enableAudit = numberOfConsumers <= 1;
         System.out.println("Enable Audit = " + enableAudit);
         consumer.setEnableAudit(enableAudit);

Added: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java?rev=693915&view=auto
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
 (added)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
 Wed Sep 10 11:01:46 2008
@@ -0,0 +1,391 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.failover;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Set;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.log4j.Logger;
+
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * TestCase showing the message-destroying described in AMQ-1925
+ * 
+ * @version $Revision: 1.1 $
+ */
+public class AMQ1925Test extends TestCase {
+       private static final Logger log = Logger.getLogger(AMQ1925Test.class);
+
+       private static final String QUEUE_NAME = "test.amq1925";
+       private static final String PROPERTY_MSG_NUMBER = "NUMBER";
+       private static final int MESSAGE_COUNT = 10000;
+
+       private BrokerService bs;
+       private URI tcpUri;
+       private ActiveMQConnectionFactory cf;
+
+       public void testAMQ1925_TXInProgress() throws Exception {
+               Connection connection = cf.createConnection();
+               connection.start();
+               Session session = connection.createSession(true,
+                               Session.SESSION_TRANSACTED);
+               MessageConsumer consumer = session.createConsumer(session
+                               .createQueue(QUEUE_NAME));
+
+               // The runnable is likely to interrupt during the 
session#commit, since
+               // this takes the longest
+               final Object starter = new Object();
+               final AtomicBoolean restarted = new AtomicBoolean();
+               new Thread(new Runnable() {
+                       public void run() {
+                               try {
+                                       synchronized (starter) {
+                                               starter.wait();
+                                       }
+
+                                       // Simulate broker failure & restart
+                                       bs.stop();
+                                       bs = new BrokerService();
+                                       bs.setPersistent(true);
+                                       bs.setUseJmx(true);
+                                       bs.addConnector(tcpUri);
+                                       bs.start();
+
+                                       restarted.set(true);
+                               } catch (Exception e) {
+                                       e.printStackTrace();
+                               }
+                       }
+               }).start();
+
+               synchronized (starter) {
+                       starter.notifyAll();
+               }
+               for (int i = 0; i < MESSAGE_COUNT; i++) {
+                       Message message = consumer.receive(500);
+                       assertNotNull("No Message " + i + " found", message);
+
+                       if (i < 10)
+                               assertFalse("Timing problem, restarted too 
soon", restarted
+                                               .get());
+                       if (i == 10) {
+                               synchronized (starter) {
+                                       starter.notifyAll();
+                               }
+                       }
+                       if (i > MESSAGE_COUNT - 100) {
+                               assertTrue("Timing problem, restarted too 
late", restarted
+                                               .get());
+                       }
+
+                       assertEquals(i, 
message.getIntProperty(PROPERTY_MSG_NUMBER));
+                       session.commit();
+               }
+               assertNull(consumer.receive(500));
+
+               consumer.close();
+               session.close();
+               connection.close();
+
+               assertQueueEmpty();
+       }
+
+       public void XtestAMQ1925_TXInProgress_TwoConsumers() throws Exception {
+               Connection connection = cf.createConnection();
+               connection.start();
+               Session session1 = connection.createSession(true,
+                               Session.SESSION_TRANSACTED);
+               MessageConsumer consumer1 = session1.createConsumer(session1
+                               .createQueue(QUEUE_NAME));
+               Session session2 = connection.createSession(true,
+                               Session.SESSION_TRANSACTED);
+               MessageConsumer consumer2 = session2.createConsumer(session2
+                               .createQueue(QUEUE_NAME));
+
+               // The runnable is likely to interrupt during the 
session#commit, since
+               // this takes the longest
+               final Object starter = new Object();
+               final AtomicBoolean restarted = new AtomicBoolean();
+               new Thread(new Runnable() {
+                       public void run() {
+                               try {
+                                       synchronized (starter) {
+                                               starter.wait();
+                                       }
+
+                                       // Simulate broker failure & restart
+                                       bs.stop();
+                                       bs = new BrokerService();
+                                       bs.setPersistent(true);
+                                       bs.setUseJmx(true);
+                                       bs.addConnector(tcpUri);
+                                       bs.start();
+
+                                       restarted.set(true);
+                               } catch (Exception e) {
+                                       e.printStackTrace();
+                               }
+                       }
+               }).start();
+
+               synchronized (starter) {
+                       starter.notifyAll();
+               }
+               Collection<Integer> results = new 
ArrayList<Integer>(MESSAGE_COUNT);
+               for (int i = 0; i < MESSAGE_COUNT; i++) {
+                       Message message1 = consumer1.receive(20);
+                       Message message2 = consumer2.receive(20);
+                       if (message1 == null && message2 == null) {
+                               if (results.size() < MESSAGE_COUNT) {
+                                       message1 = consumer1.receive(500);
+                                       message2 = consumer2.receive(500);
+
+                                       if (message1 == null && message2 == 
null) {
+                                               // Missing messages
+                                               break;
+                                       }
+                               }
+                               break;
+                       }
+
+                       if (i < 10)
+                               assertFalse("Timing problem, restarted too 
soon", restarted
+                                               .get());
+                       if (i == 10) {
+                               synchronized (starter) {
+                                       starter.notifyAll();
+                               }
+                       }
+                       if (i > MESSAGE_COUNT - 50) {
+                               assertTrue("Timing problem, restarted too 
late", restarted
+                                               .get());
+                       }
+
+                       if (message1 != null) {
+                               
results.add(message1.getIntProperty(PROPERTY_MSG_NUMBER));
+                               session1.commit();
+                       }
+                       if (message2 != null) {
+                               
results.add(message2.getIntProperty(PROPERTY_MSG_NUMBER));
+                               session2.commit();
+                       }
+               }
+               assertNull(consumer1.receive(500));
+               assertNull(consumer2.receive(500));
+
+               consumer1.close();
+               session1.close();
+               consumer2.close();
+               session2.close();
+               connection.close();
+
+               int foundMissingMessages = 0;
+               if (results.size() < MESSAGE_COUNT) {
+                       foundMissingMessages = tryToFetchMissingMessages();
+               }
+               for (int i = 0; i < MESSAGE_COUNT; i++) {
+                       assertTrue("Message-Nr " + i + " not found (" + 
results.size()
+                                       + " total, " + foundMissingMessages
+                                       + " have been found 'lingering' in the 
queue)", results
+                                       .contains(i));
+               }
+               assertQueueEmpty();
+       }
+
+       private int tryToFetchMissingMessages() throws JMSException {
+               Connection connection = cf.createConnection();
+               connection.start();
+               Session session = connection.createSession(true, 0);
+               MessageConsumer consumer = session.createConsumer(session
+                               .createQueue(QUEUE_NAME));
+
+               int count = 0;
+               while (true) {
+                       Message message = consumer.receive(500);
+                       if (message == null)
+                               break;
+
+                       log.info("Found \"missing\" message: " + message);
+                       count++;
+               }
+
+               consumer.close();
+               session.close();
+               connection.close();
+
+               return count;
+       }
+
+       public void testAMQ1925_TXBegin() throws Exception {
+               Connection connection = cf.createConnection();
+               connection.start();
+               Session session = connection.createSession(true,
+                               Session.SESSION_TRANSACTED);
+               MessageConsumer consumer = session.createConsumer(session
+                               .createQueue(QUEUE_NAME));
+
+               for (int i = 0; i < MESSAGE_COUNT; i++) {
+                       Message message = consumer.receive(500);
+                       assertNotNull(message);
+
+                       if (i == 222) {
+                               // Simulate broker failure & restart
+                               bs.stop();
+                               bs = new BrokerService();
+                               bs.setPersistent(true);
+                               bs.setUseJmx(true);
+                               bs.addConnector(tcpUri);
+                               bs.start();
+                       }
+
+                       assertEquals(i, 
message.getIntProperty(PROPERTY_MSG_NUMBER));
+                       session.commit();
+               }
+               assertNull(consumer.receive(500));
+
+               consumer.close();
+               session.close();
+               connection.close();
+
+               assertQueueEmpty();
+       }
+
+       public void testAMQ1925_TXCommited() throws Exception {
+               Connection connection = cf.createConnection();
+               connection.start();
+               Session session = connection.createSession(true,
+                               Session.SESSION_TRANSACTED);
+               MessageConsumer consumer = session.createConsumer(session
+                               .createQueue(QUEUE_NAME));
+
+               for (int i = 0; i < MESSAGE_COUNT; i++) {
+                       Message message = consumer.receive(500);
+                       assertNotNull(message);
+
+                       assertEquals(i, 
message.getIntProperty(PROPERTY_MSG_NUMBER));
+                       session.commit();
+
+                       if (i == 222) {
+                               // Simulate broker failure & restart
+                               bs.stop();
+                               bs = new BrokerService();
+                               bs.setPersistent(true);
+                               bs.setUseJmx(true);
+                               bs.addConnector(tcpUri);
+                               bs.start();
+                       }
+               }
+               assertNull(consumer.receive(500));
+
+               consumer.close();
+               session.close();
+               connection.close();
+
+               assertQueueEmpty();
+       }
+
+       private void assertQueueEmpty() throws Exception {
+               Connection connection = cf.createConnection();
+               connection.start();
+               Session session = connection.createSession(true,
+                               Session.SESSION_TRANSACTED);
+               MessageConsumer consumer = session.createConsumer(session
+                               .createQueue(QUEUE_NAME));
+
+               Message msg = consumer.receive(500);
+               if (msg != null) {
+                       fail(msg.toString());
+               }
+
+               consumer.close();
+               session.close();
+               connection.close();
+
+               assertQueueLength(0);
+       }
+
+       private void assertQueueLength(int len) throws Exception, IOException {
+               Set<Destination> destinations = bs.getBroker().getDestinations(
+                               new ActiveMQQueue(QUEUE_NAME));
+               Queue queue = (Queue) destinations.iterator().next();
+               assertEquals(len, queue.getMessageStore().getMessageCount());
+       }
+
+       private void sendMessagesToQueue() throws Exception {
+               Connection connection = cf.createConnection();
+               Session session = connection.createSession(true,
+                               Session.SESSION_TRANSACTED);
+               MessageProducer producer = session.createProducer(session
+                               .createQueue(QUEUE_NAME));
+
+               producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+               for (int i = 0; i < MESSAGE_COUNT; i++) {
+                       TextMessage message = session
+                                       .createTextMessage("Test message " + i);
+                       message.setIntProperty(PROPERTY_MSG_NUMBER, i);
+                       producer.send(message);
+               }
+               session.commit();
+
+               producer.close();
+               session.close();
+               connection.close();
+
+               assertQueueLength(MESSAGE_COUNT);
+       }
+
+       protected void setUp() throws Exception {
+               bs = new BrokerService();
+               bs.setPersistent(true);
+               bs.deleteAllMessages();
+               bs.setUseJmx(true);
+               TransportConnector connector = 
bs.addConnector("tcp://localhost:0");
+               bs.start();
+               tcpUri = connector.getConnectUri();
+
+               cf = new ActiveMQConnectionFactory("failover://(" + tcpUri + 
")");
+
+               sendMessagesToQueue();
+       }
+
+       protected void tearDown() throws Exception {
+               new ServiceStopper().stop(bs);
+       }
+
+}

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to