Author: robbie
Date: Mon Mar 5 12:17:54 2012
New Revision: 1297026
URL: http://svn.apache.org/viewvc?rev=1297026&view=rev
Log:
QPID-3881: Ensure we only put 0-8/0-9/0-9-1 messages in the store if they are
actually routable. Remove some unused and test-only methods.
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1297026&r1=1297025&r2=1297026&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Mon Mar 5 12:17:54 2012
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -295,26 +296,9 @@ public class AMQChannel implements Sessi
_currentMessage.setExpiration();
+
_currentMessage.headersReceived(getProtocolSession().getLastReceivedTime());
- MessageMetaData mmd =
_currentMessage.headersReceived(getProtocolSession().getLastReceivedTime());
- final StoredMessage<MessageMetaData> handle =
_messageStore.addMessage(mmd);
- _currentMessage.setStoredMessage(handle);
-
- routeCurrentMessage();
-
-
- _transaction.addPostTransactionAction(new
ServerTransaction.Action()
- {
-
- public void postCommit()
- {
- }
-
- public void onRollback()
- {
- handle.remove();
- }
- });
+ _currentMessage.route();
deliverCurrentMessageIfComplete();
}
@@ -346,17 +330,41 @@ public class AMQChannel implements Sessi
{
_actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchange().asString(),
_currentMessage.getRoutingKey()));
}
-
}
else
{
+ final StoredMessage<MessageMetaData> handle =
_messageStore.addMessage(_currentMessage.getMessageMetaData());
+ _currentMessage.setStoredMessage(handle);
+ int bodyCount = _currentMessage.getBodyCount();
+ if(bodyCount > 0)
+ {
+ long bodyLengthReceived = 0;
+ for(int i = 0 ; i < bodyCount ; i++)
+ {
+ ContentChunk contentChunk =
_currentMessage.getContentChunk(i);
+ handle.addContent((int)bodyLengthReceived,
ByteBuffer.wrap(contentChunk.getData()));
+ bodyLengthReceived += contentChunk.getSize();
+ }
+ }
+
+ _transaction.addPostTransactionAction(new
ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ }
+
+ public void onRollback()
+ {
+ handle.remove();
+ }
+ });
+
_transaction.enqueue(destinationQueues,
_currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues),
getProtocolSession().getLastReceivedTime());
incrementOutstandingTxnsIfNecessary();
- updateTransactionalActivity();
+ updateTransactionalActivity();
+ _currentMessage.getStoredMessage().flushToStore();
}
}
- _currentMessage.getStoredMessage().flushToStore();
-
}
finally
{
@@ -383,9 +391,6 @@ public class AMQChannel implements Sessi
try
{
-
- // returns true iff the message was delivered (i.e. if all data was
- // received
final ContentChunk contentChunk =
_session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(contentBody);
@@ -409,11 +414,6 @@ public class AMQChannel implements Sessi
}
}
- protected void routeCurrentMessage() throws AMQException
- {
- _currentMessage.route();
- }
-
public long getNextDeliveryTag()
{
return ++_deliveryTag;
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=1297026&r1=1297025&r2=1297026&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
Mon Mar 5 12:17:54 2012
@@ -70,8 +70,6 @@ public class IncomingMessage implements
private Exchange _exchange;
-
- private int _receivedChunkCount = 0;
private List<ContentChunk> _contentChunks = new ArrayList<ContentChunk>();
// we keep both the original meta data object and the store reference to
it just in case the
@@ -132,12 +130,6 @@ public class IncomingMessage implements
}
- public MessageMetaData headersReceived()
- {
-
- return headersReceived(System.currentTimeMillis());
- }
-
public MessageMetaData headersReceived(long currentTime)
{
_messageMetaData = new MessageMetaData(_messagePublishInfo,
_contentHeaderBody, 0, currentTime);
@@ -150,16 +142,10 @@ public class IncomingMessage implements
return _destinationQueues;
}
- public int addContentBodyFrame(final ContentChunk contentChunk)
- throws AMQException
+ public void addContentBodyFrame(final ContentChunk contentChunk) throws
AMQException
{
- _storedMessageHandle.addContent((int)_bodyLengthReceived,
ByteBuffer.wrap(contentChunk.getData()));
_bodyLengthReceived += contentChunk.getSize();
_contentChunks.add(contentChunk);
-
-
-
- return _receivedChunkCount++;
}
public boolean allContentReceived()
@@ -259,18 +245,12 @@ public class IncomingMessage implements
return _expiration;
}
- public int getReceivedChunkCount()
- {
- return _receivedChunkCount;
- }
-
-
public int getBodyCount() throws AMQException
{
return _contentChunks.size();
}
- public ContentChunk getContentChunk(int index) throws
IllegalArgumentException, AMQException
+ public ContentChunk getContentChunk(int index)
{
return _contentChunks.get(index);
}
@@ -330,4 +310,9 @@ public class IncomingMessage implements
{
return _connectionReference;
}
+
+ public MessageMetaData getMessageMetaData()
+ {
+ return _messageMetaData;
+ }
}
Modified:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=1297026&r1=1297025&r2=1297026&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
(original)
+++
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
Mon Mar 5 12:17:54 2012
@@ -123,7 +123,7 @@ public class AbstractHeadersExchangeTest
protected int route(Message m) throws AMQException
{
- m.getIncomingMessage().headersReceived();
+ m.getIncomingMessage().headersReceived(System.currentTimeMillis());
m.route(exchange);
if(m.getIncomingMessage().allContentReceived())
{
Modified:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java?rev=1297026&r1=1297025&r2=1297026&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
(original)
+++
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
Mon Mar 5 12:17:54 2012
@@ -350,7 +350,7 @@ public class TopicExchangeTest extends I
private int routeMessage(final IncomingMessage message)
throws AMQException
{
- MessageMetaData mmd = message.headersReceived();
+ MessageMetaData mmd =
message.headersReceived(System.currentTimeMillis());
message.setStoredMessage(_store.addMessage(mmd));
message.enqueue(_exchange.route(message));
Modified:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=1297026&r1=1297025&r2=1297026&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
(original)
+++
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
Mon Mar 5 12:17:54 2012
@@ -35,6 +35,8 @@ import org.apache.qpid.server.subscripti
import org.apache.qpid.server.util.InternalBrokerBaseCase;
import javax.management.Notification;
+
+import java.nio.ByteBuffer;
import java.util.ArrayList;
/** This class tests all the alerts an AMQQueue can throw based on threshold
values of different parameters */
@@ -300,7 +302,7 @@ public class AMQQueueAlertTest extends I
messages[i] = message(false, size);
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(getQueue());
- metaData[i] = messages[i].headersReceived();
+ metaData[i] =
messages[i].headersReceived(System.currentTimeMillis());
messages[i].setStoredMessage(getMessageStore().addMessage(metaData[i]));
messages[i].enqueue(qs);
@@ -309,30 +311,29 @@ public class AMQQueueAlertTest extends I
for (int i = 0; i < messageCount; i++)
{
- messages[i].addContentBodyFrame(
- new ContentChunk()
- {
-
- private byte[] _data = new byte[(int)size];
-
- public int getSize()
- {
- return (int) size;
- }
-
- public byte[] getData()
- {
- return _data;
- }
+ ContentChunk contentChunk = new ContentChunk()
+ {
+ private byte[] _data = new byte[(int)size];
- public void reduceToFit()
- {
+ public int getSize()
+ {
+ return (int) size;
+ }
+
+ public byte[] getData()
+ {
+ return _data;
+ }
+
+ public void reduceToFit()
+ {
+ }
+ };
- }
- });
+ messages[i].addContentBodyFrame(contentChunk);
+ messages[i].getStoredMessage().addContent(0,
ByteBuffer.wrap(contentChunk.getData()));
getQueue().enqueue(new AMQMessage(messages[i].getStoredMessage()));
-
}
}
Modified:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=1297026&r1=1297025&r2=1297026&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
(original)
+++
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
Mon Mar 5 12:17:54 2012
@@ -27,6 +27,7 @@ import org.apache.qpid.framing.AMQShortS
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.server.AMQChannel;
@@ -45,6 +46,7 @@ import javax.management.openmbean.Compos
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.TabularData;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
@@ -457,15 +459,16 @@ public class AMQQueueMBeanTest extends I
currentMessage.enqueue(qs);
// route header
- MessageMetaData mmd = currentMessage.headersReceived();
- currentMessage.setStoredMessage(getMessageStore().addMessage(mmd));
+ MessageMetaData mmd =
currentMessage.headersReceived(System.currentTimeMillis());
- // Add the body so we have something to test later
- currentMessage.addContentBodyFrame(
- getSession().getMethodRegistry()
-
.getProtocolVersionMethodConverter()
- .convertToContentChunk(
- new ContentBody(new
byte[(int) MESSAGE_SIZE])));
+ // Add the message to the store so we have something to test later
+ currentMessage.setStoredMessage(getMessageStore().addMessage(mmd));
+ ContentChunk chunk = getSession().getMethodRegistry()
+
.getProtocolVersionMethodConverter()
+ .convertToContentChunk(
+ new ContentBody(new byte[(int)
MESSAGE_SIZE]));
+ currentMessage.addContentBodyFrame(chunk);
+ currentMessage.getStoredMessage().addContent(0,
ByteBuffer.wrap(chunk.getData()));
AMQMessage m = new AMQMessage(currentMessage.getStoredMessage());
for(BaseQueue q : currentMessage.getDestinationQueues())
Modified:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java?rev=1297026&r1=1297025&r2=1297026&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
(original)
+++
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
Mon Mar 5 12:17:54 2012
@@ -143,7 +143,7 @@ public class AckTest extends InternalBro
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
msg.enqueue(qs);
- MessageMetaData mmd = msg.headersReceived();
+ MessageMetaData mmd =
msg.headersReceived(System.currentTimeMillis());
final StoredMessage storedMessage = _messageStore.addMessage(mmd);
msg.setStoredMessage(storedMessage);
final AMQMessage message = new AMQMessage(storedMessage);
Modified:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=1297026&r1=1297025&r2=1297026&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
(original)
+++
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
Mon Mar 5 12:17:54 2012
@@ -633,7 +633,7 @@ public class SimpleAMQQueueTest extends
// Send persistent message
qs.add(_queue);
- MessageMetaData metaData = msg.headersReceived();
+ MessageMetaData metaData =
msg.headersReceived(System.currentTimeMillis());
StoredMessage handle = _store.addMessage(metaData);
msg.setStoredMessage(handle);
Modified:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java?rev=1297026&r1=1297025&r2=1297026&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
(original)
+++
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
Mon Mar 5 12:17:54 2012
@@ -600,7 +600,7 @@ public class MessageStoreTest extends In
currentMessage.setExpiration();
- MessageMetaData mmd = currentMessage.headersReceived();
+ MessageMetaData mmd =
currentMessage.headersReceived(System.currentTimeMillis());
currentMessage.setStoredMessage(getVirtualHost().getMessageStore().addMessage(mmd));
currentMessage.getStoredMessage().flushToStore();
currentMessage.route();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]