Author: gtully
Date: Thu Apr 25 12:47:20 2013
New Revision: 1475734

URL: http://svn.apache.org/r1475734
Log:
https://issues.apache.org/jira/browse/AMQ-4485 - ensure cursor updates in same 
order as store orderindex via beforeCompletion with index lock. The before 
completion tracks ordered work that first thread completes as a unit. All 
updates to a destination are combined to a single sync, such that there is no 
cursor contention between transactions

Added:
    
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java
   (with props)
Modified:
    
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
    
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
    
activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
    
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java

Modified: 
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1475734&r1=1475733&r2=1475734&view=diff
==============================================================================
--- 
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
 (original)
+++ 
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
 Thu Apr 25 12:47:20 2013
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.DelayQueue;
@@ -83,6 +84,7 @@ import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.transaction.Transaction;
 import org.apache.activemq.usage.Usage;
 import org.apache.activemq.usage.UsageListener;
 import org.apache.activemq.util.BrokerSupport;
@@ -734,6 +736,120 @@ public class Queue extends BaseDestinati
         }
     }
 
+    final ConcurrentHashMap<Transaction, SendSync> sendSyncs = new 
ConcurrentHashMap<Transaction, SendSync>();
+    private volatile LinkedList<Transaction> orderIndexUpdates = new 
LinkedList<Transaction>();
+
+    // roll up all message sends
+    class SendSync extends Synchronization {
+
+        class MessageContext {
+            public Message message;
+            public ConnectionContext context;
+
+            public MessageContext(ConnectionContext context, Message message) {
+                this.context = context;
+                this.message = message;
+            }
+        }
+
+        final Transaction transaction;
+        List<MessageContext> additions = new ArrayList<MessageContext>();
+
+        public SendSync(Transaction transaction) {
+            this.transaction = transaction;
+        }
+
+        public void add(ConnectionContext context, Message message) {
+            additions.add(new MessageContext(context, message));
+        }
+
+        @Override
+        public void beforeCommit() throws Exception {
+            synchronized (sendLock) {
+                orderIndexUpdates.addLast(transaction);
+            }
+        }
+
+        @Override
+        public void afterCommit() throws Exception {
+            LinkedList<Transaction> orderedWork = null;
+            // use existing object to sync orderIndexUpdates that can be 
reassigned
+            synchronized (sendLock) {
+                if (transaction == orderIndexUpdates.peek()) {
+                    orderedWork = orderIndexUpdates;
+                    orderIndexUpdates = new LinkedList<Transaction>();
+
+                    // talking all the ordered work means that earlier
+                    // and later threads do nothing.
+                    // this avoids contention/race on the sendLock that
+                    // guards the actual work.
+                }
+            }
+            // do the ordered work
+            if (orderedWork != null) {
+                sendLock.lockInterruptibly();
+                try {
+                    for (Transaction tx : orderedWork) {
+                        sendSyncs.get(tx).processSend();
+                    }
+                } finally {
+                    sendLock.unlock();
+                }
+                for (Transaction tx : orderedWork) {
+                    sendSyncs.get(tx).processSent();
+                }
+                sendSyncs.remove(transaction);
+            }
+        }
+
+        // called with sendLock
+        private void processSend() throws Exception {
+
+            for (Iterator<MessageContext> iterator = additions.iterator(); 
iterator.hasNext(); ) {
+                MessageContext messageContext = iterator.next();
+                // It could take while before we receive the commit
+                // op, by that time the message could have expired..
+                if (broker.isExpired(messageContext.message)) {
+                    broker.messageExpired(messageContext.context, 
messageContext.message, null);
+                    destinationStatistics.getExpired().increment();
+                    iterator.remove();
+                    continue;
+                }
+                sendMessage(messageContext.message);
+                messageContext.message.decrementReferenceCount();
+            }
+        }
+
+        private void processSent() throws Exception {
+            for (MessageContext messageContext : additions) {
+                messageSent(messageContext.context, messageContext.message);
+            }
+        }
+
+        @Override
+        public void afterRollback() throws Exception {
+            try {
+                for (MessageContext messageContext : additions) {
+                    messageContext.message.decrementReferenceCount();
+                }
+            } finally {
+                sendSyncs.remove(transaction);
+            }
+        }
+    }
+
+    // called while holding the sendLock
+    private void registerSendSync(Message message, ConnectionContext context) {
+        final Transaction transaction = context.getTransaction();
+        Queue.SendSync currentSync = sendSyncs.get(transaction);
+        if (currentSync == null) {
+            currentSync = new Queue.SendSync(transaction);
+            transaction.addSynchronization(currentSync);
+            sendSyncs.put(transaction, currentSync);
+        }
+        currentSync.add(context, message);
+    }
+
     void doMessageSend(final ProducerBrokerExchange producerExchange, final 
Message message) throws IOException,
             Exception {
         final ConnectionContext context = 
producerExchange.getConnectionContext();
@@ -759,30 +875,7 @@ public class Queue extends BaseDestinati
                 // our memory. This increment is decremented once the tx 
finishes..
                 message.incrementReferenceCount();
 
-                context.getTransaction().addSynchronization(new 
Synchronization() {
-                    @Override
-                    public void afterCommit() throws Exception {
-                        sendLock.lockInterruptibly();
-                        try {
-                            // It could take while before we receive the commit
-                            // op, by that time the message could have 
expired..
-                            if (broker.isExpired(message)) {
-                                broker.messageExpired(context, message, null);
-                                destinationStatistics.getExpired().increment();
-                                return;
-                            }
-                            sendMessage(message);
-                        } finally {
-                            sendLock.unlock();
-                            message.decrementReferenceCount();
-                        }
-                        messageSent(context, message);
-                    }
-                    @Override
-                    public void afterRollback() throws Exception {
-                        message.decrementReferenceCount();
-                    }
-                });
+                registerSendSync(message, context);
             } else {
                 // Add to the pending list, this takes care of incrementing the
                 // usage manager.

Modified: 
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java?rev=1475734&r1=1475733&r2=1475734&view=diff
==============================================================================
--- 
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
 (original)
+++ 
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
 Thu Apr 25 12:47:20 2013
@@ -128,7 +128,7 @@ public abstract class Transaction {
 
     @Override
     public String toString() {
-        return super.toString() + "[synchronizations=" + synchronizations + 
"]";
+        return "Local-" + getTransactionId() + "[synchronizations=" + 
synchronizations + "]";
     }
 
     public abstract void commit(boolean onePhase) throws XAException, 
IOException;

Modified: 
activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1475734&r1=1475733&r2=1475734&view=diff
==============================================================================
--- 
activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 (original)
+++ 
activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 Thu Apr 25 12:47:20 2013
@@ -43,7 +43,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedSet;
-import java.util.Stack;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -833,7 +832,7 @@ public abstract class MessageDatabase ex
                 lastRecoveryPosition = nextRecoveryPosition;
                 metadata.lastUpdate = lastRecoveryPosition;
                 JournalCommand<?> message = load(lastRecoveryPosition);
-                process(message, lastRecoveryPosition, (Runnable)null);
+                process(message, lastRecoveryPosition, (Runnable)null, 
(Runnable)null);
                 nextRecoveryPosition = 
journal.getNextLocation(lastRecoveryPosition);
             }
         } finally {
@@ -913,10 +912,7 @@ public abstract class MessageDatabase ex
      * the JournalMessage is used to update the index just like it would be 
done
      * during a recovery process.
      */
-    public Location store(JournalCommand<?> data, boolean sync, Runnable 
before,Runnable after, Runnable onJournalStoreComplete) throws IOException {
-        if (before != null) {
-            before.run();
-        }
+    public Location store(JournalCommand<?> data, boolean sync, Runnable 
before, Runnable after, Runnable onJournalStoreComplete) throws IOException {
         try {
             ByteSequence sequence = toByteSequence(data);
 
@@ -927,7 +923,7 @@ public abstract class MessageDatabase ex
                 long start = System.currentTimeMillis();
                 location = onJournalStoreComplete == null ? 
journal.write(sequence, sync) :  journal.write(sequence, 
onJournalStoreComplete) ;
                 long start2 = System.currentTimeMillis();
-                process(data, location, after);
+                process(data, location, before, after);
 
                 long end = System.currentTimeMillis();
                 if( LOG_SLOW_ACCESS_TIME>0 && end-start > 
LOG_SLOW_ACCESS_TIME) {
@@ -940,18 +936,7 @@ public abstract class MessageDatabase ex
                 checkpointLock.readLock().unlock();
             }
             if (after != null) {
-                Runnable afterCompletion = null;
-                synchronized (orderedTransactionAfters) {
-                    if (!orderedTransactionAfters.empty()) {
-                        afterCompletion = orderedTransactionAfters.pop();
-                    }
-                }
-                if (afterCompletion != null) {
-                    afterCompletion.run();
-                } else {
-                    // non persistent message case
-                    after.run();
-                }
+                after.run();
             }
 
             if (checkpointThread != null && !checkpointThread.isAlive()) {
@@ -1004,7 +989,7 @@ public abstract class MessageDatabase ex
      */
     void process(JournalCommand<?> data, final Location location, final 
Location inDoubtlocation) throws IOException {
         if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 
0) {
-            process(data, location, (Runnable) null);
+            process(data, location, (Runnable) null, (Runnable) null);
         } else {
             // just recover producer audit
             data.visit(new Visitor() {
@@ -1022,7 +1007,7 @@ public abstract class MessageDatabase ex
     // from the recovery method too so they need to be idempotent
     // /////////////////////////////////////////////////////////////////
 
-    void process(JournalCommand<?> data, final Location location, final 
Runnable after) throws IOException {
+    void process(JournalCommand<?> data, final Location location, final 
Runnable before, final Runnable after) throws IOException {
         data.visit(new Visitor() {
             @Override
             public void visit(KahaAddMessageCommand command) throws 
IOException {
@@ -1041,7 +1026,7 @@ public abstract class MessageDatabase ex
 
             @Override
             public void visit(KahaCommitCommand command) throws IOException {
-                process(command, location, after);
+                process(command, location, before, after);
             }
 
             @Override
@@ -1153,17 +1138,8 @@ public abstract class MessageDatabase ex
         }
     }
 
-    private final Stack<Runnable> orderedTransactionAfters = new 
Stack<Runnable>();
-    private void push(Runnable after) {
-        if (after != null) {
-            synchronized (orderedTransactionAfters) {
-                orderedTransactionAfters.push(after);
-            }
-        }
-    }
-
     @SuppressWarnings("rawtypes")
-    protected void process(KahaCommitCommand command, Location location, final 
Runnable after) throws IOException {
+    protected void process(KahaCommitCommand command, Location location, final 
Runnable before, final Runnable after) throws IOException {
         TransactionId key = 
TransactionIdConversion.convert(command.getTransactionInfo());
         List<Operation> inflightTx;
         synchronized (inflightTransactions) {
@@ -1173,9 +1149,9 @@ public abstract class MessageDatabase ex
             }
         }
         if (inflightTx == null) {
-            if (after != null) {
-                // since we don't push this after and we may find another, 
lets run it now
-                after.run();
+            // only non persistent messages in this tx
+            if (before != null) {
+                before.run();
             }
             return;
         }
@@ -1183,6 +1159,10 @@ public abstract class MessageDatabase ex
         final List<Operation> messagingTx = inflightTx;
         this.indexLock.writeLock().lock();
         try {
+            // run before with the index lock so that queue can order cursor 
updates with index updates
+            if (before != null) {
+                before.run();
+            }
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 @Override
                 public void execute(Transaction tx) throws IOException {
@@ -1192,7 +1172,6 @@ public abstract class MessageDatabase ex
                 }
             });
             metadata.lastUpdate = location;
-            push(after);
         } finally {
             this.indexLock.writeLock().unlock();
         }

Modified: 
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java?rev=1475734&r1=1475733&r2=1475734&view=diff
==============================================================================
--- 
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
 (original)
+++ 
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
 Thu Apr 25 12:47:20 2013
@@ -145,14 +145,8 @@ public class MessageExpirationTest exten
         connection.send(closeConnectionInfo(connectionInfo2));
     }
 
-    /**
-     * Small regression. Looks like persistent messages to a queue are not 
being
-     * timedout when in a long transaction. See:
-     * http://issues.apache.org/activemq/browse/AMQ-1269 Commenting out the
-     * DeliveryMode.PERSISTENT test combination for now.
-     */
     public void initCombosForTestMessagesInLongTransactionExpire() {
-        addCombinationValues("deliveryMode", new Object[] 
{Integer.valueOf(DeliveryMode.NON_PERSISTENT)});
+        addCombinationValues("deliveryMode", new Object[] 
{Integer.valueOf(DeliveryMode.PERSISTENT), 
Integer.valueOf(DeliveryMode.NON_PERSISTENT)});
         addCombinationValues("destinationType", new Object[] 
{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), 
Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
                                                               
Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), 
Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
     }

Modified: 
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java?rev=1475734&r1=1475733&r2=1475734&view=diff
==============================================================================
--- 
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
 (original)
+++ 
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
 Thu Apr 25 12:47:20 2013
@@ -88,7 +88,8 @@ public class NegativeQueueTest extends A
     private static final long MEMORY_USAGE = 400000000;
     private static final long TEMP_USAGE = 200000000;
     private static final long STORE_USAGE = 1000000000;
-    private static final int MESSAGE_COUNT = 1100;
+    // ensure we exceed the cache 70%
+    private static final int MESSAGE_COUNT = 2100;
 
     protected static final boolean TRANSACTED = true;
     protected static final boolean DEBUG = true;

Added: 
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java?rev=1475734&view=auto
==============================================================================
--- 
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java
 (added)
+++ 
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java
 Thu Apr 25 12:47:20 2013
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.TransactionBroker;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4485Test extends TestCase {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AMQ4485Test.class);
+    BrokerService broker;
+    ActiveMQConnectionFactory factory;
+    final int messageCount = 20;
+    int memoryLimit = 40 * 1024;
+    final ActiveMQQueue destination = new ActiveMQQueue("QUEUE." + 
this.getClass().getName());
+    final Vector<Throwable> exceptions = new Vector<Throwable>();
+    final CountDownLatch slowSendResume = new CountDownLatch(1);
+
+
+    protected void configureBroker(long memoryLimit) throws Exception {
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setAdvisorySupport(false);
+
+        PolicyEntry policy = new PolicyEntry();
+        policy.setExpireMessagesPeriod(0);
+        policy.setMemoryLimit(memoryLimit);
+        policy.setProducerFlowControl(false);
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(pMap);
+
+        broker.setPlugins(new BrokerPlugin[] {new BrokerPluginSupport() {
+            @Override
+            public void send(ProducerBrokerExchange producerExchange, final 
Message messageSend) throws Exception {
+                if (messageSend.isInTransaction() && 
messageSend.getProperty("NUM") != null) {
+                    final Integer num = (Integer) 
messageSend.getProperty("NUM");
+                    if (true) {
+                        TransactionBroker transactionBroker = 
(TransactionBroker)broker.getBroker().getAdaptor(TransactionBroker.class);
+                        
transactionBroker.getTransaction(producerExchange.getConnectionContext(), 
messageSend.getTransactionId(), false).addSynchronization(
+                                new Synchronization() {
+                                    public void afterCommit() throws Exception 
{
+                                        LOG.error("AfterCommit, NUM:" + num + 
", " + messageSend.getMessageId() + ", tx: " + messageSend.getTransactionId());
+                                        if (num == 5) {
+                                            // we want to add to cursor after 
usage is exhausted by message 20 and when
+                                            // all other messages have been 
processed
+                                            LOG.error("Pausing on latch in 
afterCommit for: " + num + ", " + messageSend.getMessageId());
+                                            slowSendResume.await(20, 
TimeUnit.SECONDS);
+                                            LOG.error("resuming on latch 
afterCommit for: " + num + ", " + messageSend.getMessageId());
+                                        } else if (messageCount + 1 == num) {
+                                            LOG.error("releasing latch. " + 
num + ", " + messageSend.getMessageId());
+                                            slowSendResume.countDown();
+                                            // for message X, we need to delay 
so message 5 can setBatch
+                                            TimeUnit.SECONDS.sleep(5);
+                                            LOG.error("resuming afterCommit 
for: " + num + ", " + messageSend.getMessageId());
+                                        }
+                                    }
+                                });
+                    }
+                }
+                super.send(producerExchange, messageSend);
+            }
+        }
+        });
+
+    }
+
+
+    public void testOutOfOrderTransactionCompletionOnMemoryLimit() throws 
Exception {
+
+        Set<Integer> expected = new HashSet<Integer>();
+        final Vector<Session> sessionVector = new Vector<Session>();
+        ExecutorService executorService = Executors.newCachedThreadPool();
+        for (int i = 1; i <= messageCount; i++) {
+           sessionVector.add(send(i, 1, true));
+           expected.add(i);
+        }
+
+        // get parallel commit so that the sync writes are batched
+        for (int i = 0; i < messageCount; i++) {
+            final int id = i;
+            executorService.submit(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        sessionVector.get(id).commit();
+                    } catch (Exception fail) {
+                        exceptions.add(fail);
+                    }
+                }
+            });
+        }
+
+        final DestinationViewMBean queueViewMBean = (DestinationViewMBean)
+                
broker.getManagementContext().newProxyInstance(broker.getAdminView().getQueues()[0],
 DestinationViewMBean.class, false);
+
+        // not sure how many messages will get enqueued
+        TimeUnit.SECONDS.sleep(3);
+        if (false)
+        assertTrue("all " + messageCount + " on the q", Wait.waitFor(new 
Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("enqueueCount: " + queueViewMBean.getEnqueueCount());
+                return messageCount == queueViewMBean.getEnqueueCount();
+            }
+        }));
+
+        LOG.info("Big send to blow available destination usage before slow 
send resumes");
+        send(messageCount + 1, 35*1024, true).commit();
+
+
+        // consume and verify all received
+        Connection cosumerConnection = factory.createConnection();
+        cosumerConnection.start();
+        MessageConsumer consumer = cosumerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE).createConsumer(destination);
+        for (int i = 1; i <= messageCount + 1; i++) {
+            BytesMessage bytesMessage = (BytesMessage) consumer.receive(10000);
+            assertNotNull("Got message: " + i + ", " + expected, bytesMessage);
+            MessageId mqMessageId = ((ActiveMQBytesMessage) 
bytesMessage).getMessageId();
+            LOG.info("got: " + expected + ", "  + mqMessageId + ", NUM=" + 
((ActiveMQBytesMessage) bytesMessage).getProperty("NUM"));
+            expected.remove(((ActiveMQBytesMessage) 
bytesMessage).getProperty("NUM"));
+        }
+    }
+
+    private Session send(int id, int messageSize, boolean transacted) throws 
Exception {
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(transacted, transacted ? 
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        BytesMessage bytesMessage = session.createBytesMessage();
+        bytesMessage.writeBytes(new byte[messageSize]);
+        bytesMessage.setIntProperty("NUM", id);
+        producer.send(bytesMessage);
+        LOG.info("Sent:" + bytesMessage.getJMSMessageID() + " session tx: " + 
((ActiveMQBytesMessage) bytesMessage).getTransactionId());
+        return session;
+    }
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        broker = new BrokerService();
+        broker.setBrokerName("thisOne");
+        configureBroker(memoryLimit);
+        broker.start();
+        factory = new 
ActiveMQConnectionFactory("vm://thisOne?jms.alwaysSyncSend=true");
+        factory.setWatchTopicAdvisories(false);
+
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        if (broker != null) {
+            broker.stop();
+            broker = null;
+        }
+    }
+
+}
\ No newline at end of file

Propchange: 
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date


Reply via email to