Author: gtully
Date: Thu Apr 25 12:47:20 2013
New Revision: 1475734
URL: http://svn.apache.org/r1475734
Log:
https://issues.apache.org/jira/browse/AMQ-4485 - ensure cursor updates in same
order as store orderindex via beforeCompletion with index lock. The before
completion tracks ordered work that first thread completes as a unit. All
updates to a destination are combined to a single sync, such that there is no
cursor contention between transactions
Added:
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java
(with props)
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1475734&r1=1475733&r2=1475734&view=diff
==============================================================================
---
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Apr 25 12:47:20 2013
@@ -30,6 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
@@ -83,6 +84,7 @@ import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.transaction.Transaction;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
import org.apache.activemq.util.BrokerSupport;
@@ -734,6 +736,120 @@ public class Queue extends BaseDestinati
}
}
+ final ConcurrentHashMap<Transaction, SendSync> sendSyncs = new
ConcurrentHashMap<Transaction, SendSync>();
+ private volatile LinkedList<Transaction> orderIndexUpdates = new
LinkedList<Transaction>();
+
+ // roll up all message sends
+ class SendSync extends Synchronization {
+
+ class MessageContext {
+ public Message message;
+ public ConnectionContext context;
+
+ public MessageContext(ConnectionContext context, Message message) {
+ this.context = context;
+ this.message = message;
+ }
+ }
+
+ final Transaction transaction;
+ List<MessageContext> additions = new ArrayList<MessageContext>();
+
+ public SendSync(Transaction transaction) {
+ this.transaction = transaction;
+ }
+
+ public void add(ConnectionContext context, Message message) {
+ additions.add(new MessageContext(context, message));
+ }
+
+ @Override
+ public void beforeCommit() throws Exception {
+ synchronized (sendLock) {
+ orderIndexUpdates.addLast(transaction);
+ }
+ }
+
+ @Override
+ public void afterCommit() throws Exception {
+ LinkedList<Transaction> orderedWork = null;
+ // use existing object to sync orderIndexUpdates that can be
reassigned
+ synchronized (sendLock) {
+ if (transaction == orderIndexUpdates.peek()) {
+ orderedWork = orderIndexUpdates;
+ orderIndexUpdates = new LinkedList<Transaction>();
+
+ // talking all the ordered work means that earlier
+ // and later threads do nothing.
+ // this avoids contention/race on the sendLock that
+ // guards the actual work.
+ }
+ }
+ // do the ordered work
+ if (orderedWork != null) {
+ sendLock.lockInterruptibly();
+ try {
+ for (Transaction tx : orderedWork) {
+ sendSyncs.get(tx).processSend();
+ }
+ } finally {
+ sendLock.unlock();
+ }
+ for (Transaction tx : orderedWork) {
+ sendSyncs.get(tx).processSent();
+ }
+ sendSyncs.remove(transaction);
+ }
+ }
+
+ // called with sendLock
+ private void processSend() throws Exception {
+
+ for (Iterator<MessageContext> iterator = additions.iterator();
iterator.hasNext(); ) {
+ MessageContext messageContext = iterator.next();
+ // It could take while before we receive the commit
+ // op, by that time the message could have expired..
+ if (broker.isExpired(messageContext.message)) {
+ broker.messageExpired(messageContext.context,
messageContext.message, null);
+ destinationStatistics.getExpired().increment();
+ iterator.remove();
+ continue;
+ }
+ sendMessage(messageContext.message);
+ messageContext.message.decrementReferenceCount();
+ }
+ }
+
+ private void processSent() throws Exception {
+ for (MessageContext messageContext : additions) {
+ messageSent(messageContext.context, messageContext.message);
+ }
+ }
+
+ @Override
+ public void afterRollback() throws Exception {
+ try {
+ for (MessageContext messageContext : additions) {
+ messageContext.message.decrementReferenceCount();
+ }
+ } finally {
+ sendSyncs.remove(transaction);
+ }
+ }
+ }
+
+ // called while holding the sendLock
+ private void registerSendSync(Message message, ConnectionContext context) {
+ final Transaction transaction = context.getTransaction();
+ Queue.SendSync currentSync = sendSyncs.get(transaction);
+ if (currentSync == null) {
+ currentSync = new Queue.SendSync(transaction);
+ transaction.addSynchronization(currentSync);
+ sendSyncs.put(transaction, currentSync);
+ }
+ currentSync.add(context, message);
+ }
+
void doMessageSend(final ProducerBrokerExchange producerExchange, final
Message message) throws IOException,
Exception {
final ConnectionContext context =
producerExchange.getConnectionContext();
@@ -759,30 +875,7 @@ public class Queue extends BaseDestinati
// our memory. This increment is decremented once the tx
finishes..
message.incrementReferenceCount();
- context.getTransaction().addSynchronization(new
Synchronization() {
- @Override
- public void afterCommit() throws Exception {
- sendLock.lockInterruptibly();
- try {
- // It could take while before we receive the commit
- // op, by that time the message could have
expired..
- if (broker.isExpired(message)) {
- broker.messageExpired(context, message, null);
- destinationStatistics.getExpired().increment();
- return;
- }
- sendMessage(message);
- } finally {
- sendLock.unlock();
- message.decrementReferenceCount();
- }
- messageSent(context, message);
- }
- @Override
- public void afterRollback() throws Exception {
- message.decrementReferenceCount();
- }
- });
+ registerSendSync(message, context);
} else {
// Add to the pending list, this takes care of incrementing the
// usage manager.
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java?rev=1475734&r1=1475733&r2=1475734&view=diff
==============================================================================
---
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
(original)
+++
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
Thu Apr 25 12:47:20 2013
@@ -128,7 +128,7 @@ public abstract class Transaction {
@Override
public String toString() {
- return super.toString() + "[synchronizations=" + synchronizations +
"]";
+ return "Local-" + getTransactionId() + "[synchronizations=" +
synchronizations + "]";
}
public abstract void commit(boolean onePhase) throws XAException,
IOException;
Modified:
activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1475734&r1=1475733&r2=1475734&view=diff
==============================================================================
---
activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++
activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Thu Apr 25 12:47:20 2013
@@ -43,7 +43,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
-import java.util.Stack;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -833,7 +832,7 @@ public abstract class MessageDatabase ex
lastRecoveryPosition = nextRecoveryPosition;
metadata.lastUpdate = lastRecoveryPosition;
JournalCommand<?> message = load(lastRecoveryPosition);
- process(message, lastRecoveryPosition, (Runnable)null);
+ process(message, lastRecoveryPosition, (Runnable)null,
(Runnable)null);
nextRecoveryPosition =
journal.getNextLocation(lastRecoveryPosition);
}
} finally {
@@ -913,10 +912,7 @@ public abstract class MessageDatabase ex
* the JournalMessage is used to update the index just like it would be
done
* during a recovery process.
*/
- public Location store(JournalCommand<?> data, boolean sync, Runnable
before,Runnable after, Runnable onJournalStoreComplete) throws IOException {
- if (before != null) {
- before.run();
- }
+ public Location store(JournalCommand<?> data, boolean sync, Runnable
before, Runnable after, Runnable onJournalStoreComplete) throws IOException {
try {
ByteSequence sequence = toByteSequence(data);
@@ -927,7 +923,7 @@ public abstract class MessageDatabase ex
long start = System.currentTimeMillis();
location = onJournalStoreComplete == null ?
journal.write(sequence, sync) : journal.write(sequence,
onJournalStoreComplete) ;
long start2 = System.currentTimeMillis();
- process(data, location, after);
+ process(data, location, before, after);
long end = System.currentTimeMillis();
if( LOG_SLOW_ACCESS_TIME>0 && end-start >
LOG_SLOW_ACCESS_TIME) {
@@ -940,18 +936,7 @@ public abstract class MessageDatabase ex
checkpointLock.readLock().unlock();
}
if (after != null) {
- Runnable afterCompletion = null;
- synchronized (orderedTransactionAfters) {
- if (!orderedTransactionAfters.empty()) {
- afterCompletion = orderedTransactionAfters.pop();
- }
- }
- if (afterCompletion != null) {
- afterCompletion.run();
- } else {
- // non persistent message case
- after.run();
- }
+ after.run();
}
if (checkpointThread != null && !checkpointThread.isAlive()) {
@@ -1004,7 +989,7 @@ public abstract class MessageDatabase ex
*/
void process(JournalCommand<?> data, final Location location, final
Location inDoubtlocation) throws IOException {
if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >=
0) {
- process(data, location, (Runnable) null);
+ process(data, location, (Runnable) null, (Runnable) null);
} else {
// just recover producer audit
data.visit(new Visitor() {
@@ -1022,7 +1007,7 @@ public abstract class MessageDatabase ex
// from the recovery method too so they need to be idempotent
// /////////////////////////////////////////////////////////////////
- void process(JournalCommand<?> data, final Location location, final
Runnable after) throws IOException {
+ void process(JournalCommand<?> data, final Location location, final
Runnable before, final Runnable after) throws IOException {
data.visit(new Visitor() {
@Override
public void visit(KahaAddMessageCommand command) throws
IOException {
@@ -1041,7 +1026,7 @@ public abstract class MessageDatabase ex
@Override
public void visit(KahaCommitCommand command) throws IOException {
- process(command, location, after);
+ process(command, location, before, after);
}
@Override
@@ -1153,17 +1138,8 @@ public abstract class MessageDatabase ex
}
}
- private final Stack<Runnable> orderedTransactionAfters = new
Stack<Runnable>();
- private void push(Runnable after) {
- if (after != null) {
- synchronized (orderedTransactionAfters) {
- orderedTransactionAfters.push(after);
- }
- }
- }
-
@SuppressWarnings("rawtypes")
- protected void process(KahaCommitCommand command, Location location, final
Runnable after) throws IOException {
+ protected void process(KahaCommitCommand command, Location location, final
Runnable before, final Runnable after) throws IOException {
TransactionId key =
TransactionIdConversion.convert(command.getTransactionInfo());
List<Operation> inflightTx;
synchronized (inflightTransactions) {
@@ -1173,9 +1149,9 @@ public abstract class MessageDatabase ex
}
}
if (inflightTx == null) {
- if (after != null) {
- // since we don't push this after and we may find another,
lets run it now
- after.run();
+ // only non persistent messages in this tx
+ if (before != null) {
+ before.run();
}
return;
}
@@ -1183,6 +1159,10 @@ public abstract class MessageDatabase ex
final List<Operation> messagingTx = inflightTx;
this.indexLock.writeLock().lock();
try {
+ // run before with the index lock so that queue can order cursor
updates with index updates
+ if (before != null) {
+ before.run();
+ }
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
@@ -1192,7 +1172,6 @@ public abstract class MessageDatabase ex
}
});
metadata.lastUpdate = location;
- push(after);
} finally {
this.indexLock.writeLock().unlock();
}
Modified:
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java?rev=1475734&r1=1475733&r2=1475734&view=diff
==============================================================================
---
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
(original)
+++
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
Thu Apr 25 12:47:20 2013
@@ -145,14 +145,8 @@ public class MessageExpirationTest exten
connection.send(closeConnectionInfo(connectionInfo2));
}
- /**
- * Small regression. Looks like persistent messages to a queue are not
being
- * timedout when in a long transaction. See:
- * http://issues.apache.org/activemq/browse/AMQ-1269 Commenting out the
- * DeliveryMode.PERSISTENT test combination for now.
- */
public void initCombosForTestMessagesInLongTransactionExpire() {
- addCombinationValues("deliveryMode", new Object[]
{Integer.valueOf(DeliveryMode.NON_PERSISTENT)});
+ addCombinationValues("deliveryMode", new Object[]
{Integer.valueOf(DeliveryMode.PERSISTENT),
Integer.valueOf(DeliveryMode.NON_PERSISTENT)});
addCombinationValues("destinationType", new Object[]
{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
}
Modified:
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java?rev=1475734&r1=1475733&r2=1475734&view=diff
==============================================================================
---
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
(original)
+++
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
Thu Apr 25 12:47:20 2013
@@ -88,7 +88,8 @@ public class NegativeQueueTest extends A
private static final long MEMORY_USAGE = 400000000;
private static final long TEMP_USAGE = 200000000;
private static final long STORE_USAGE = 1000000000;
- private static final int MESSAGE_COUNT = 1100;
+ // ensure we exceed the cache 70%
+ private static final int MESSAGE_COUNT = 2100;
protected static final boolean TRANSACTED = true;
protected static final boolean DEBUG = true;
Added:
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java?rev=1475734&view=auto
==============================================================================
---
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java
(added)
+++
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java
Thu Apr 25 12:47:20 2013
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.TransactionBroker;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4485Test extends TestCase {
+ private static final Logger LOG =
LoggerFactory.getLogger(AMQ4485Test.class);
+ BrokerService broker;
+ ActiveMQConnectionFactory factory;
+ final int messageCount = 20;
+ int memoryLimit = 40 * 1024;
+ final ActiveMQQueue destination = new ActiveMQQueue("QUEUE." +
this.getClass().getName());
+ final Vector<Throwable> exceptions = new Vector<Throwable>();
+ final CountDownLatch slowSendResume = new CountDownLatch(1);
+
+
+ protected void configureBroker(long memoryLimit) throws Exception {
+ broker.setDeleteAllMessagesOnStartup(true);
+ broker.setAdvisorySupport(false);
+
+ PolicyEntry policy = new PolicyEntry();
+ policy.setExpireMessagesPeriod(0);
+ policy.setMemoryLimit(memoryLimit);
+ policy.setProducerFlowControl(false);
+ PolicyMap pMap = new PolicyMap();
+ pMap.setDefaultEntry(policy);
+ broker.setDestinationPolicy(pMap);
+
+ broker.setPlugins(new BrokerPlugin[] {new BrokerPluginSupport() {
+ @Override
+ public void send(ProducerBrokerExchange producerExchange, final
Message messageSend) throws Exception {
+ if (messageSend.isInTransaction() &&
messageSend.getProperty("NUM") != null) {
+ final Integer num = (Integer)
messageSend.getProperty("NUM");
+ if (true) {
+ TransactionBroker transactionBroker =
(TransactionBroker)broker.getBroker().getAdaptor(TransactionBroker.class);
+
transactionBroker.getTransaction(producerExchange.getConnectionContext(),
messageSend.getTransactionId(), false).addSynchronization(
+ new Synchronization() {
+ public void afterCommit() throws Exception
{
+ LOG.error("AfterCommit, NUM:" + num +
", " + messageSend.getMessageId() + ", tx: " + messageSend.getTransactionId());
+ if (num == 5) {
+ // we want to add to cursor after
usage is exhausted by message 20 and when
+ // all other messages have been
processed
+ LOG.error("Pausing on latch in
afterCommit for: " + num + ", " + messageSend.getMessageId());
+ slowSendResume.await(20,
TimeUnit.SECONDS);
+ LOG.error("resuming on latch
afterCommit for: " + num + ", " + messageSend.getMessageId());
+ } else if (messageCount + 1 == num) {
+ LOG.error("releasing latch. " +
num + ", " + messageSend.getMessageId());
+ slowSendResume.countDown();
+ // for message X, we need to delay
so message 5 can setBatch
+ TimeUnit.SECONDS.sleep(5);
+ LOG.error("resuming afterCommit
for: " + num + ", " + messageSend.getMessageId());
+ }
+ }
+ });
+ }
+ }
+ super.send(producerExchange, messageSend);
+ }
+ }
+ });
+
+ }
+
+
+ public void testOutOfOrderTransactionCompletionOnMemoryLimit() throws
Exception {
+
+ Set<Integer> expected = new HashSet<Integer>();
+ final Vector<Session> sessionVector = new Vector<Session>();
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ for (int i = 1; i <= messageCount; i++) {
+ sessionVector.add(send(i, 1, true));
+ expected.add(i);
+ }
+
+ // get parallel commit so that the sync writes are batched
+ for (int i = 0; i < messageCount; i++) {
+ final int id = i;
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ sessionVector.get(id).commit();
+ } catch (Exception fail) {
+ exceptions.add(fail);
+ }
+ }
+ });
+ }
+
+ final DestinationViewMBean queueViewMBean = (DestinationViewMBean)
+
broker.getManagementContext().newProxyInstance(broker.getAdminView().getQueues()[0],
DestinationViewMBean.class, false);
+
+ // not sure how many messages will get enqueued
+ TimeUnit.SECONDS.sleep(3);
+ if (false)
+ assertTrue("all " + messageCount + " on the q", Wait.waitFor(new
Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ LOG.info("enqueueCount: " + queueViewMBean.getEnqueueCount());
+ return messageCount == queueViewMBean.getEnqueueCount();
+ }
+ }));
+
+ LOG.info("Big send to blow available destination usage before slow
send resumes");
+ send(messageCount + 1, 35*1024, true).commit();
+
+
+ // consume and verify all received
+ Connection cosumerConnection = factory.createConnection();
+ cosumerConnection.start();
+ MessageConsumer consumer = cosumerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE).createConsumer(destination);
+ for (int i = 1; i <= messageCount + 1; i++) {
+ BytesMessage bytesMessage = (BytesMessage) consumer.receive(10000);
+ assertNotNull("Got message: " + i + ", " + expected, bytesMessage);
+ MessageId mqMessageId = ((ActiveMQBytesMessage)
bytesMessage).getMessageId();
+ LOG.info("got: " + expected + ", " + mqMessageId + ", NUM=" +
((ActiveMQBytesMessage) bytesMessage).getProperty("NUM"));
+ expected.remove(((ActiveMQBytesMessage)
bytesMessage).getProperty("NUM"));
+ }
+ }
+
+ private Session send(int id, int messageSize, boolean transacted) throws
Exception {
+ Connection connection = factory.createConnection();
+ connection.start();
+ Session session = connection.createSession(transacted, transacted ?
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(destination);
+ BytesMessage bytesMessage = session.createBytesMessage();
+ bytesMessage.writeBytes(new byte[messageSize]);
+ bytesMessage.setIntProperty("NUM", id);
+ producer.send(bytesMessage);
+ LOG.info("Sent:" + bytesMessage.getJMSMessageID() + " session tx: " +
((ActiveMQBytesMessage) bytesMessage).getTransactionId());
+ return session;
+ }
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ broker = new BrokerService();
+ broker.setBrokerName("thisOne");
+ configureBroker(memoryLimit);
+ broker.start();
+ factory = new
ActiveMQConnectionFactory("vm://thisOne?jms.alwaysSyncSend=true");
+ factory.setWatchTopicAdvisories(false);
+
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ if (broker != null) {
+ broker.stop();
+ broker = null;
+ }
+ }
+
+}
\ No newline at end of file
Propchange:
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java
------------------------------------------------------------------------------
svn:keywords = Rev Date