Author: robbie
Date: Mon Mar  5 12:17:54 2012
New Revision: 1297026

URL: http://svn.apache.org/viewvc?rev=1297026&view=rev
Log:
QPID-3881: Ensure we only put 0-8/0-9/0-9-1 messages in the store if they are 
actually routable. Remove some unused and test-only methods.

Modified:
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
    
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
    
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
    
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
    
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1297026&r1=1297025&r2=1297026&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 Mon Mar  5 12:17:54 2012
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -295,26 +296,9 @@ public class AMQChannel implements Sessi
 
             _currentMessage.setExpiration();
 
+            
_currentMessage.headersReceived(getProtocolSession().getLastReceivedTime());
 
-            MessageMetaData mmd = 
_currentMessage.headersReceived(getProtocolSession().getLastReceivedTime());
-            final StoredMessage<MessageMetaData> handle = 
_messageStore.addMessage(mmd);
-            _currentMessage.setStoredMessage(handle);
-
-            routeCurrentMessage();
-
-
-            _transaction.addPostTransactionAction(new 
ServerTransaction.Action()
-            {
-
-                public void postCommit()
-                {
-                }
-
-                public void onRollback()
-                {
-                    handle.remove();
-                }
-            });
+            _currentMessage.route();
 
             deliverCurrentMessageIfComplete();
         }
@@ -346,17 +330,41 @@ public class AMQChannel implements Sessi
                         {
                             
_actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchange().asString(),
 _currentMessage.getRoutingKey()));
                         }
-
                     }
                     else
                     {
+                        final StoredMessage<MessageMetaData> handle = 
_messageStore.addMessage(_currentMessage.getMessageMetaData());
+                        _currentMessage.setStoredMessage(handle);
+                        int bodyCount = _currentMessage.getBodyCount();
+                        if(bodyCount > 0)
+                        {
+                            long bodyLengthReceived = 0;
+                            for(int i = 0 ; i < bodyCount ; i++)
+                            {
+                                ContentChunk contentChunk = 
_currentMessage.getContentChunk(i);
+                                handle.addContent((int)bodyLengthReceived, 
ByteBuffer.wrap(contentChunk.getData()));
+                                bodyLengthReceived += contentChunk.getSize();
+                            }
+                        }
+
+                        _transaction.addPostTransactionAction(new 
ServerTransaction.Action()
+                        {
+                            public void postCommit()
+                            {
+                            }
+
+                            public void onRollback()
+                            {
+                                handle.remove();
+                            }
+                        });
+
                         _transaction.enqueue(destinationQueues, 
_currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues), 
getProtocolSession().getLastReceivedTime());
                         incrementOutstandingTxnsIfNecessary();
-                                   updateTransactionalActivity();
+                        updateTransactionalActivity();
+                        _currentMessage.getStoredMessage().flushToStore();
                     }
                 }
-                _currentMessage.getStoredMessage().flushToStore();
-
             }
             finally
             {
@@ -383,9 +391,6 @@ public class AMQChannel implements Sessi
 
         try
         {
-
-            // returns true iff the message was delivered (i.e. if all data was
-            // received
             final ContentChunk contentChunk =
                     
_session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(contentBody);
 
@@ -409,11 +414,6 @@ public class AMQChannel implements Sessi
         }
     }
 
-    protected void routeCurrentMessage() throws AMQException
-    {
-        _currentMessage.route();
-    }
-
     public long getNextDeliveryTag()
     {
         return ++_deliveryTag;

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=1297026&r1=1297025&r2=1297026&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
 Mon Mar  5 12:17:54 2012
@@ -70,8 +70,6 @@ public class IncomingMessage implements 
 
     private Exchange _exchange;
 
-
-    private int _receivedChunkCount = 0;
     private List<ContentChunk> _contentChunks = new ArrayList<ContentChunk>();
 
     // we keep both the original meta data object and the store reference to 
it just in case the
@@ -132,12 +130,6 @@ public class IncomingMessage implements 
 
     }
 
-    public MessageMetaData headersReceived()
-    {
-
-        return headersReceived(System.currentTimeMillis());
-    }
-
     public MessageMetaData headersReceived(long currentTime)
     {
         _messageMetaData = new MessageMetaData(_messagePublishInfo, 
_contentHeaderBody, 0, currentTime);
@@ -150,16 +142,10 @@ public class IncomingMessage implements 
         return _destinationQueues;
     }
 
-    public int addContentBodyFrame(final ContentChunk contentChunk)
-            throws AMQException
+    public void addContentBodyFrame(final ContentChunk contentChunk) throws 
AMQException
     {
-        _storedMessageHandle.addContent((int)_bodyLengthReceived, 
ByteBuffer.wrap(contentChunk.getData()));
         _bodyLengthReceived += contentChunk.getSize();
         _contentChunks.add(contentChunk);
-
-
-
-        return _receivedChunkCount++;
     }
 
     public boolean allContentReceived()
@@ -259,18 +245,12 @@ public class IncomingMessage implements 
         return _expiration;
     }
 
-    public int getReceivedChunkCount()
-    {
-        return _receivedChunkCount;
-    }
-
-
     public int getBodyCount() throws AMQException
     {
         return _contentChunks.size();
     }
 
-    public ContentChunk getContentChunk(int index) throws 
IllegalArgumentException, AMQException
+    public ContentChunk getContentChunk(int index)
     {
         return _contentChunks.get(index);
     }
@@ -330,4 +310,9 @@ public class IncomingMessage implements 
     {
         return _connectionReference;
     }
+
+    public MessageMetaData getMessageMetaData()
+    {
+        return _messageMetaData;
+    }
 }

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=1297026&r1=1297025&r2=1297026&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
 Mon Mar  5 12:17:54 2012
@@ -123,7 +123,7 @@ public class AbstractHeadersExchangeTest
 
     protected int route(Message m) throws AMQException
     {
-        m.getIncomingMessage().headersReceived();
+        m.getIncomingMessage().headersReceived(System.currentTimeMillis());
         m.route(exchange);
         if(m.getIncomingMessage().allContentReceived())
         {

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java?rev=1297026&r1=1297025&r2=1297026&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
 Mon Mar  5 12:17:54 2012
@@ -350,7 +350,7 @@ public class TopicExchangeTest extends I
     private int routeMessage(final IncomingMessage message)
             throws AMQException
     {
-        MessageMetaData mmd = message.headersReceived();
+        MessageMetaData mmd = 
message.headersReceived(System.currentTimeMillis());
         message.setStoredMessage(_store.addMessage(mmd));
 
         message.enqueue(_exchange.route(message));

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=1297026&r1=1297025&r2=1297026&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
 Mon Mar  5 12:17:54 2012
@@ -35,6 +35,8 @@ import org.apache.qpid.server.subscripti
 import org.apache.qpid.server.util.InternalBrokerBaseCase;
 
 import javax.management.Notification;
+
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 
 /** This class tests all the alerts an AMQQueue can throw based on threshold 
values of different parameters */
@@ -300,7 +302,7 @@ public class AMQQueueAlertTest extends I
             messages[i] = message(false, size);
             ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
             qs.add(getQueue());
-            metaData[i] = messages[i].headersReceived();
+            metaData[i] = 
messages[i].headersReceived(System.currentTimeMillis());
             
messages[i].setStoredMessage(getMessageStore().addMessage(metaData[i]));
 
             messages[i].enqueue(qs);
@@ -309,30 +311,29 @@ public class AMQQueueAlertTest extends I
 
         for (int i = 0; i < messageCount; i++)
         {
-            messages[i].addContentBodyFrame(
-                    new ContentChunk()
-                    {
-
-                        private byte[] _data = new byte[(int)size];
-
-                        public int getSize()
-                        {
-                            return (int) size;
-                        }
-
-                        public byte[] getData()
-                        {
-                            return _data;
-                        }
+            ContentChunk contentChunk = new ContentChunk()
+            {
+                private byte[] _data = new byte[(int)size];
 
-                        public void reduceToFit()
-                        {
+                public int getSize()
+                {
+                    return (int) size;
+                }
+
+                public byte[] getData()
+                {
+                    return _data;
+                }
+
+                public void reduceToFit()
+                {
+                }
+            };
 
-                        }
-                    });
+            messages[i].addContentBodyFrame(contentChunk);
+            messages[i].getStoredMessage().addContent(0, 
ByteBuffer.wrap(contentChunk.getData()));
 
             getQueue().enqueue(new AMQMessage(messages[i].getStoredMessage()));
-
         }
     }
 

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=1297026&r1=1297025&r2=1297026&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
 Mon Mar  5 12:17:54 2012
@@ -27,6 +27,7 @@ import org.apache.qpid.framing.AMQShortS
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.management.common.mbeans.ManagedQueue;
 import org.apache.qpid.server.AMQChannel;
@@ -45,6 +46,7 @@ import javax.management.openmbean.Compos
 import javax.management.openmbean.CompositeDataSupport;
 import javax.management.openmbean.TabularData;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
@@ -457,15 +459,16 @@ public class AMQQueueMBeanTest extends I
             currentMessage.enqueue(qs);
 
             // route header
-            MessageMetaData mmd = currentMessage.headersReceived();
-            currentMessage.setStoredMessage(getMessageStore().addMessage(mmd));
+            MessageMetaData mmd = 
currentMessage.headersReceived(System.currentTimeMillis());
 
-            // Add the body so we have something to test later
-            currentMessage.addContentBodyFrame(
-                    getSession().getMethodRegistry()
-                                                       
.getProtocolVersionMethodConverter()
-                                                       .convertToContentChunk(
-                                                       new ContentBody(new 
byte[(int) MESSAGE_SIZE])));
+            // Add the message to the store so we have something to test later
+            currentMessage.setStoredMessage(getMessageStore().addMessage(mmd));
+            ContentChunk chunk = getSession().getMethodRegistry()
+                                               
.getProtocolVersionMethodConverter()
+                                               .convertToContentChunk(
+                                               new ContentBody(new byte[(int) 
MESSAGE_SIZE]));
+            currentMessage.addContentBodyFrame(chunk);
+            currentMessage.getStoredMessage().addContent(0, 
ByteBuffer.wrap(chunk.getData()));
 
             AMQMessage m = new AMQMessage(currentMessage.getStoredMessage());
             for(BaseQueue q : currentMessage.getDestinationQueues())

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java?rev=1297026&r1=1297025&r2=1297026&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
 Mon Mar  5 12:17:54 2012
@@ -143,7 +143,7 @@ public class AckTest extends InternalBro
             ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
             qs.add(_queue);
             msg.enqueue(qs);
-            MessageMetaData mmd = msg.headersReceived();
+            MessageMetaData mmd = 
msg.headersReceived(System.currentTimeMillis());
             final StoredMessage storedMessage = _messageStore.addMessage(mmd);
             msg.setStoredMessage(storedMessage);
             final AMQMessage message = new AMQMessage(storedMessage);

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=1297026&r1=1297025&r2=1297026&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
 Mon Mar  5 12:17:54 2012
@@ -633,7 +633,7 @@ public class SimpleAMQQueueTest extends 
         // Send persistent message
 
         qs.add(_queue);
-        MessageMetaData metaData = msg.headersReceived();
+        MessageMetaData metaData = 
msg.headersReceived(System.currentTimeMillis());
         StoredMessage handle = _store.addMessage(metaData);
         msg.setStoredMessage(handle);
 

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java?rev=1297026&r1=1297025&r2=1297026&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
 Mon Mar  5 12:17:54 2012
@@ -600,7 +600,7 @@ public class MessageStoreTest extends In
 
         currentMessage.setExpiration();
 
-        MessageMetaData mmd = currentMessage.headersReceived();
+        MessageMetaData mmd = 
currentMessage.headersReceived(System.currentTimeMillis());
         
currentMessage.setStoredMessage(getVirtualHost().getMessageStore().addMessage(mmd));
         currentMessage.getStoredMessage().flushToStore();
         currentMessage.route();



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to