Author: ritchiem
Date: Mon Mar 9 15:57:33 2009
New Revision: 751718
URL: http://svn.apache.org/viewvc?rev=751718&view=rev
Log:
QPID-949 : Removed all getMessage() calls as this will cause a flowed message
to be read in to memory from disk. In all instances the reason was to perform
methods that exist on the the QueueEntry. Added accessor to MessageID on
QueueEntry. Outstanding getMessage() calls have been left in to perform
NO_LOCAL work. Moving Publisher and PublisherClient identifer to the QEI would
remove this need.
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=751718&r1=751717&r2=751718&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Mon Mar 9 15:57:33 2009
@@ -431,7 +431,7 @@
{
if (_log.isDebugEnabled())
{
- _log.debug(debugIdentity() + " Adding unacked message(" +
entry.getMessage().toString() + " DT:" + deliveryTag
+ _log.debug(debugIdentity() + " Adding unacked message(" +
entry.toString() + " DT:" + deliveryTag
+ ") with a queue(" + entry.getQueue() + ") for
" + subscription);
}
}
@@ -551,7 +551,7 @@
}
else
{
- _log.warn(System.identityHashCode(this) + " Requested requeue
of message(" + unacked.getMessage().debugIdentity()
+ _log.warn(System.identityHashCode(this) + " Requested requeue
of message(" + unacked.debugIdentity()
+ "):" + deliveryTag + " but no queue defined and no
DeadLetter queue so DROPPING message.");
unacked.dequeueAndDelete(_storeContext);
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java?rev=751718&r1=751717&r2=751718&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
Mon Mar 9 15:57:33 2009
@@ -102,7 +102,7 @@
//buffer must be marked as persistent:
for (QueueEntry msg : _unacked.values())
{
- if (msg.getMessage().isPersistent())
+ if (msg.isPersistent())
{
return true;
}
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java?rev=751718&r1=751717&r2=751718&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
Mon Mar 9 15:57:33 2009
@@ -96,7 +96,7 @@
if (_logger.isDebugEnabled())
{
- _logger.debug("Rejecting: DT:" + deliveryTag + "-" +
queueEntry.getMessage().debugIdentity() +
+ _logger.debug("Rejecting: DT:" + deliveryTag + "-" +
queueEntry.debugIdentity() +
": Requeue:" + body.getRequeue() +
//": Resend:" + evt.getMethod().resend +
" on channel:" + channel.debugIdentity());
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=751718&r1=751717&r2=751718&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
Mon Mar 9 15:57:33 2009
@@ -274,8 +274,9 @@
/**
* Checks if there is any notification to be send to the listeners
+ * @param queueEntry
*/
- public void checkForNotification(AMQMessage msg) throws AMQException
+ public void checkForNotification(QueueEntry queueEntry) throws AMQException
{
final Set<NotificationCheck> notificationChecks =
_queue.getNotificationChecks();
@@ -289,7 +290,7 @@
{
if (check.isMessageSpecific() ||
(_lastNotificationTimes[check.ordinal()] < thresholdTime))
{
- if (check.notifyIfNecessary(msg, _queue, this))
+ if (check.notifyIfNecessary(queueEntry, _queue, this))
{
_lastNotificationTimes[check.ordinal()] = currentTime;
}
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java?rev=751718&r1=751717&r2=751718&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
Mon Mar 9 15:57:33 2009
@@ -20,14 +20,12 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.AMQException;
-
public enum NotificationCheck
{
MESSAGE_COUNT_ALERT
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue,
QueueNotificationListener listener)
+ boolean notifyIfNecessary(QueueEntry queueEntry, AMQQueue queue,
QueueNotificationListener listener)
{
int msgCount;
final long maximumMessageCount = queue.getMaximumMessageCount();
@@ -41,19 +39,19 @@
},
MESSAGE_SIZE_ALERT(true)
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue,
QueueNotificationListener listener)
+ boolean notifyIfNecessary(QueueEntry queueEntry, AMQQueue queue,
QueueNotificationListener listener)
{
final long maximumMessageSize = queue.getMaximumMessageSize();
if(maximumMessageSize != 0)
{
// Check for threshold message size
- long messageSize = (msg == null) ? 0 :
msg.getContentHeaderBody().bodySize;
+ long messageSize = (queueEntry == null) ? 0 :
queueEntry.getSize();
if (messageSize >= maximumMessageSize)
{
listener.notifyClients(this, queue, messageSize + "b :
Maximum message size threshold (" +
maximumMessageSize +
") breached. [Message ID=" +
- (msg == null ? "null"
: msg.getMessageId()) + "]");
+ (queueEntry == null ?
"null" : queueEntry.getMessageId()) + "]");
return true;
}
}
@@ -63,7 +61,7 @@
},
QUEUE_DEPTH_ALERT
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue,
QueueNotificationListener listener)
+ boolean notifyIfNecessary(QueueEntry queueEntry, AMQQueue queue,
QueueNotificationListener listener)
{
// Check for threshold queue depth in bytes
final long maximumQueueDepth = queue.getMaximumQueueDepth();
@@ -84,7 +82,7 @@
},
MESSAGE_AGE_ALERT
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue,
QueueNotificationListener listener)
+ boolean notifyIfNecessary(QueueEntry queueEntry, AMQQueue queue,
QueueNotificationListener listener)
{
final long maxMessageAge = queue.getMaximumMessageAge();
@@ -126,6 +124,6 @@
return _messageSpecific;
}
- abstract boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue,
QueueNotificationListener listener);
+ abstract boolean notifyIfNecessary(QueueEntry queueEntry, AMQQueue queue,
QueueNotificationListener listener);
}
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=751718&r1=751717&r2=751718&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
Mon Mar 9 15:57:33 2009
@@ -135,6 +135,8 @@
AMQMessage getMessage();
+ Long getMessageId();
+
long getSize();
/**
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=751718&r1=751717&r2=751718&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
Mon Mar 9 15:57:33 2009
@@ -83,6 +83,7 @@
private long _expiration;
private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE |
DELIVERED_TO_CONSUMER);
+ private boolean _persistent;
QueueEntryImpl(SimpleQueueEntryList queueEntryList)
{
@@ -111,6 +112,7 @@
_flags |= IMMEDIATE;
}
_expiration = message.getExpiration();
+ _persistent = message.isPersistent();
}
_backingStore = queueEntryList.getBackingStore();
_flowed = new AtomicBoolean(false);
@@ -140,6 +142,11 @@
return _message;
}
+ public Long getMessageId()
+ {
+ return _messageId;
+ }
+
public long getSize()
{
return _messageSize;
@@ -245,12 +252,12 @@
public ContentHeaderBody getContentHeaderBody() throws AMQException
{
- return _message.getContentHeaderBody();
+ return getMessage().getContentHeaderBody();
}
public boolean isPersistent() throws AMQException
{
- return _message.isPersistent();
+ return _persistent;
}
public boolean isRedelivered()
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=751718&r1=751717&r2=751718&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
Mon Mar 9 15:57:33 2009
@@ -417,7 +417,7 @@
deliverAsync();
}
- _managedObject.checkForNotification(entry.getMessage());
+ _managedObject.checkForNotification(entry);
return entry;
}
@@ -567,10 +567,9 @@
try
{
- AMQMessage msg = entry.getMessage();
- if (msg.isPersistent())
+ if (entry.isPersistent())
{
- _virtualHost.getTransactionLog().dequeueMessage(storeContext,
this, msg.getMessageId());
+ _virtualHost.getTransactionLog().dequeueMessage(storeContext,
this, entry.getMessageId());
}
}
@@ -761,7 +760,7 @@
public boolean accept(QueueEntry entry)
{
- final long messageId = entry.getMessage().getMessageId();
+ final long messageId = entry.getMessageId();
return messageId >= fromMessageId && messageId <= toMessageId;
}
@@ -780,7 +779,7 @@
public boolean accept(QueueEntry entry)
{
- _complete = entry.getMessage().getMessageId() == messageId;
+ _complete = entry.getMessageId() == messageId;
return _complete;
}
@@ -829,7 +828,7 @@
public boolean accept(QueueEntry entry)
{
- final long messageId = entry.getMessage().getMessageId();
+ final long messageId = entry.getMessageId();
return (messageId >= fromMessageId)
&& (messageId <= toMessageId)
&& entry.acquire();
@@ -848,11 +847,9 @@
// Move the messages in the transaction log.
for (QueueEntry entry : entries)
{
- AMQMessage message = entry.getMessage();
-
- if (message.isPersistent() && toQueue.isDurable())
+ if (entry.isPersistent() && toQueue.isDurable())
{
- transactionLog.enqueueMessage(storeContext, toQueue,
message.getMessageId());
+ transactionLog.enqueueMessage(storeContext, toQueue,
entry.getMessageId());
}
// dequeue will remove the messages from the queue
entry.dequeue(storeContext);
@@ -887,6 +884,9 @@
for (QueueEntry entry : entries)
{
toQueue.enqueue(storeContext, entry.getMessage());
+ // As we only did a dequeue above now that we have moved the
message we should perform a delete.
+ // We cannot do this earlier as the message will be lost if
flowed.
+ //entry.delete();
}
}
catch (MessageCleanupException e)
@@ -913,7 +913,7 @@
public boolean accept(QueueEntry entry)
{
- final long messageId = entry.getMessage().getMessageId();
+ final long messageId = entry.getMessageId();
if ((messageId >= fromMessageId)
&& (messageId <= toMessageId))
{
@@ -939,11 +939,9 @@
// Move the messages in on the transaction log.
for (QueueEntry entry : entries)
{
- AMQMessage message = entry.getMessage();
-
- if (!entry.isDeleted() && message.isPersistent() &&
toQueue.isDurable())
+ if (!entry.isDeleted() && entry.isPersistent() &&
toQueue.isDurable())
{
- transactionLog.enqueueMessage(storeContext, toQueue,
message.getMessageId());
+ transactionLog.enqueueMessage(storeContext, toQueue,
entry.getMessageId());
}
}
@@ -1002,7 +1000,7 @@
{
QueueEntry node = queueListIterator.getNode();
- final long messageId = node.getMessage().getMessageId();
+ final long messageId = node.getMessageId();
if ((messageId >= fromMessageId)
&& (messageId <= toMessageId)
@@ -1438,7 +1436,7 @@
}
else
{
- _managedObject.checkForNotification(node.getMessage());
+ _managedObject.checkForNotification(node);
}
}
@@ -1605,7 +1603,7 @@
for (int i = 0; i < num && !it.atTail(); i++)
{
it.advance();
- ids.add(it.getNode().getMessage().getMessageId());
+ ids.add(it.getNode().getMessageId());
}
return ids;
}
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=751718&r1=751717&r2=751718&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
Mon Mar 9 15:57:33 2009
@@ -387,6 +387,7 @@
//todo - client id should be recoreded and this test removed but
handled below
if (_noLocal)
{
+ //todo getPublisherClientInstance should be moved to QueueEntryImpl
final Object publisherId =
entry.getMessage().getPublisherClientInstance();
// We don't want local messages so check to see if message is one
we sent
@@ -407,6 +408,7 @@
//todo - client id should be recoreded and this test removed
but handled here
+ //todo getPublisherIdentifier should be moved to
QueueEntryImpl
if (localInstance != null &&
localInstance.equals(entry.getMessage().getPublisherIdentifier()))
{
return false;
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=751718&r1=751717&r2=751718&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
Mon Mar 9 15:57:33 2009
@@ -127,9 +127,9 @@
{
if (debug)
{
- _log.debug("Discarding message: " +
queueEntry.getMessage().getMessageId());
+ _log.debug("Discarding message: " +
queueEntry.getMessageId());
}
- if(queueEntry.getMessage().isPersistent())
+ if(queueEntry.isPersistent())
{
beginTranIfNecessary();
}
@@ -175,9 +175,9 @@
if (debug)
{
- _log.debug("Discarding message: " +
queueEntry.getMessage().getMessageId());
+ _log.debug("Discarding message: " + queueEntry.getMessageId());
}
- if(queueEntry.getMessage().isPersistent())
+ if(queueEntry.isPersistent())
{
beginTranIfNecessary();
}
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java?rev=751718&r1=751717&r2=751718&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
Mon Mar 9 15:57:33 2009
@@ -172,7 +172,7 @@
{
for (QueueEntry msg : messages)
{
- ids.add(msg.getMessage().getMessageId());
+ ids.add(msg.getMessageId());
}
}
}
Modified:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java?rev=751718&r1=751717&r2=751718&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
(original)
+++
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
Mon Mar 9 15:57:33 2009
@@ -87,7 +87,7 @@
while(queueEntries.advance())
{
QueueEntry entry = queueEntries.getNode();
- _unacknowledgedMessageMap.add(entry.getMessage().getMessageId(),
entry);
+ _unacknowledgedMessageMap.add(entry.getMessageId(), entry);
// Store the entry for future inspection
_referenceList.add(entry);
Modified:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=751718&r1=751717&r2=751718&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
(original)
+++
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
Mon Mar 9 15:57:33 2009
@@ -215,6 +215,11 @@
return null; //To change body of implemented methods use
File | Settings | File Templates.
}
+ public Long getMessageId()
+ {
+ return null; //To change body of implemented methods use
File | Settings | File Templates.
+ }
+
public long getSize()
{
return 0; //To change body of implemented methods use
File | Settings | File Templates.
Modified:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java?rev=751718&r1=751717&r2=751718&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
(original)
+++
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
Mon Mar 9 15:57:33 2009
@@ -100,7 +100,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object)
message.getMessageId(),
queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object)
message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -140,7 +140,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object)
message.getMessageId(),
queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object)
message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -159,7 +159,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object)
message.getMessageId(),
queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object)
message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -198,7 +198,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object)
message.getMessageId(),
queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object)
message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -217,7 +217,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object)
message.getMessageId(),
queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object)
message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -236,7 +236,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object)
message.getMessageId(),
queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object)
message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -254,7 +254,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object)
message.getMessageId(),
queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object)
message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -294,7 +294,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object)
message.getMessageId(),
queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object)
message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -312,7 +312,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object)
message.getMessageId(),
queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object)
message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -352,7 +352,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object)
message.getMessageId(),
queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object)
message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -384,7 +384,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object)
message.getMessageId(),
queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object)
message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -425,7 +425,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object)
message.getMessageId(),
queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object)
message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -464,7 +464,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object)
message.getMessageId(),
queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object)
message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
Modified:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java?rev=751718&r1=751717&r2=751718&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
(original)
+++
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
Mon Mar 9 15:57:33 2009
@@ -68,17 +68,17 @@
ArrayList<QueueEntry> msgs = _subscription.getQueueEntries();
try
{
- assertEquals(new Long(1 + messagIDOffset),
msgs.get(0).getMessage().getMessageId());
- assertEquals(new Long(6 + messagIDOffset),
msgs.get(1).getMessage().getMessageId());
- assertEquals(new Long(8 + messagIDOffset),
msgs.get(2).getMessage().getMessageId());
-
- assertEquals(new Long(2 + messagIDOffset),
msgs.get(3).getMessage().getMessageId());
- assertEquals(new Long(5 + messagIDOffset),
msgs.get(4).getMessage().getMessageId());
- assertEquals(new Long(7 + messagIDOffset),
msgs.get(5).getMessage().getMessageId());
-
- assertEquals(new Long(3 + messagIDOffset),
msgs.get(6).getMessage().getMessageId());
- assertEquals(new Long(4 + messagIDOffset),
msgs.get(7).getMessage().getMessageId());
- assertEquals(new Long(9 + messagIDOffset),
msgs.get(8).getMessage().getMessageId());
+ assertEquals(new Long(1 + messagIDOffset),
msgs.get(0).getMessageId());
+ assertEquals(new Long(6 + messagIDOffset),
msgs.get(1).getMessageId());
+ assertEquals(new Long(8 + messagIDOffset),
msgs.get(2).getMessageId());
+
+ assertEquals(new Long(2 + messagIDOffset),
msgs.get(3).getMessageId());
+ assertEquals(new Long(5 + messagIDOffset),
msgs.get(4).getMessageId());
+ assertEquals(new Long(7 + messagIDOffset),
msgs.get(5).getMessageId());
+
+ assertEquals(new Long(3 + messagIDOffset),
msgs.get(6).getMessageId());
+ assertEquals(new Long(4 + messagIDOffset),
msgs.get(7).getMessageId());
+ assertEquals(new Long(9 + messagIDOffset),
msgs.get(8).getMessageId());
}
catch (AssertionFailedError afe)
{
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]