Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
 Sun Oct 11 15:10:43 2009
@@ -1,6 +1,6 @@
 package org.apache.qpid.server.queue;
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -8,16 +8,16 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
 
@@ -31,21 +31,18 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ContentHeaderProperties;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.exchange.DirectExchange;
-import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
 import org.apache.qpid.server.subscription.MockSubscription;
 import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionImpl;
-import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.Transaction;
 
 public class SimpleAMQQueueTest extends TestCase
 {
@@ -59,7 +56,7 @@
     protected DirectExchange _exchange = new DirectExchange();
     protected MockSubscription _subscription = new MockSubscription();
     protected FieldTable _arguments = null;
-    
+
     MessagePublishInfo info = new MessagePublishInfo()
     {
 
@@ -88,7 +85,7 @@
             return null;
         }
     };
-    
+
     @Override
     protected void setUp() throws Exception
     {
@@ -119,51 +116,51 @@
         }
         catch (IllegalArgumentException e)
         {
-            assertTrue("Exception was not about missing name", 
+            assertTrue("Exception was not about missing name",
                             e.getMessage().contains("name"));
         }
-        
+
         try {
             _queue = new SimpleAMQQueue(_qname, false, _owner, false, null);
             assertNull("Queue was created", _queue);
         }
         catch (IllegalArgumentException e)
         {
-            assertTrue("Exception was not about missing vhost", 
+            assertTrue("Exception was not about missing vhost",
                     e.getMessage().contains("Host"));
         }
 
-        _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, 
false, _owner, false, 
+        _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, 
false, _owner, false,
                                                                 _virtualHost, 
_arguments);
         assertNotNull("Queue was not created", _queue);
     }
-    
+
     public void testGetVirtualHost()
     {
         assertEquals("Virtual host was wrong", _virtualHost, 
_queue.getVirtualHost());
     }
-    
+
     public void testBinding()
     {
         try
         {
             _queue.bind(_exchange, _routingKey, null);
-            assertTrue("Routing key was not bound", 
+            assertTrue("Routing key was not bound",
                             _exchange.getBindings().containsKey(_routingKey));
-            assertEquals("Queue was not bound to key", 
+            assertEquals("Queue was not bound to key",
                         _exchange.getBindings().get(_routingKey).get(0),
                         _queue);
-            assertEquals("Exchange binding count", 1, 
+            assertEquals("Exchange binding count", 1,
                     _queue.getExchangeBindings().size());
-            assertEquals("Wrong exchange bound", _routingKey, 
+            assertEquals("Wrong exchange bound", _routingKey,
                     _queue.getExchangeBindings().get(0).getRoutingKey());
-            assertEquals("Wrong exchange bound", _exchange, 
+            assertEquals("Wrong exchange bound", _exchange,
                     _queue.getExchangeBindings().get(0).getExchange());
-            
+
             _queue.unBind(_exchange, _routingKey, null);
-            assertFalse("Routing key was still bound", 
+            assertFalse("Routing key was still bound",
                     _exchange.getBindings().containsKey(_routingKey));
-            assertNull("Routing key was not empty", 
+            assertNull("Routing key was not empty",
                     _exchange.getBindings().get(_routingKey));
         }
         catch (AMQException e)
@@ -171,36 +168,36 @@
             assertNull("Unexpected exception", e);
         }
     }
-    
+
     public void testSubscription() throws AMQException
     {
         // Check adding a subscription adds it to the queue
         _queue.registerSubscription(_subscription, false);
-        assertEquals("Subscription did not get queue", _queue, 
+        assertEquals("Subscription did not get queue", _queue,
                       _subscription.getQueue());
-        assertEquals("Queue does not have consumer", 1, 
+        assertEquals("Queue does not have consumer", 1,
                      _queue.getConsumerCount());
-        assertEquals("Queue does not have active consumer", 1, 
+        assertEquals("Queue does not have active consumer", 1,
                 _queue.getActiveConsumerCount());
-        
+
         // Check sending a message ends up with the subscriber
         AMQMessage messageA = createMessage(new Long(24));
         _queue.enqueue(messageA);
         assertEquals(messageA, 
_subscription.getQueueContext().getLastSeenEntry().getMessage());
-        
+
         // Check removing the subscription removes it's information from the 
queue
         _queue.unregisterSubscription(_subscription);
         assertTrue("Subscription still had queue", _subscription.isClosed());
         assertFalse("Queue still has consumer", 1 == 
_queue.getConsumerCount());
-        assertFalse("Queue still has active consumer", 
+        assertFalse("Queue still has active consumer",
                 1 == _queue.getActiveConsumerCount());
-        
+
         AMQMessage messageB = createMessage(new Long (25));
         _queue.enqueue(messageB);
-        QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry();
-        assertNull(entry);
+         assertNull(_subscription.getQueueContext());
+
     }
-    
+
     public void testQueueNoSubscriber() throws AMQException, 
InterruptedException
     {
         AMQMessage messageA = createMessage(new Long(24));
@@ -214,18 +211,18 @@
     {
         // Check adding an exclusive subscription adds it to the queue
         _queue.registerSubscription(_subscription, true);
-        assertEquals("Subscription did not get queue", _queue, 
+        assertEquals("Subscription did not get queue", _queue,
                 _subscription.getQueue());
-        assertEquals("Queue does not have consumer", 1, 
+        assertEquals("Queue does not have consumer", 1,
                 _queue.getConsumerCount());
-        assertEquals("Queue does not have active consumer", 1, 
+        assertEquals("Queue does not have active consumer", 1,
                 _queue.getActiveConsumerCount());
 
         // Check sending a message ends up with the subscriber
         AMQMessage messageA = createMessage(new Long(24));
         _queue.enqueue(messageA);
         assertEquals(messageA, 
_subscription.getQueueContext().getLastSeenEntry().getMessage());
-        
+
         // Check we cannot add a second subscriber to the queue
         Subscription subB = new MockSubscription();
         Exception ex = null;
@@ -235,12 +232,12 @@
         }
         catch (AMQException e)
         {
-           ex = e; 
+           ex = e;
         }
         assertNotNull(ex);
         assertTrue(ex instanceof AMQException);
 
-        // Check we cannot add an exclusive subscriber to a queue with an 
+        // Check we cannot add an exclusive subscriber to a queue with an
         // existing subscription
         _queue.unregisterSubscription(_subscription);
         _queue.registerSubscription(_subscription, false);
@@ -250,15 +247,15 @@
         }
         catch (AMQException e)
         {
-           ex = e; 
+           ex = e;
         }
         assertNotNull(ex);
     }
-    
-    public void testAutoDeleteQueue() throws Exception 
+
+    public void testAutoDeleteQueue() throws Exception
     {
        _queue.stop();
-       _queue = new SimpleAMQQueue(_qname, false, _owner, true, _virtualHost);
+       _queue = new SimpleAMQQueue(_qname, false, null, true, _virtualHost);
        _queue.registerSubscription(_subscription, false);
        AMQMessage message = createMessage(new Long(25));
        _queue.enqueue(message);
@@ -266,7 +263,7 @@
        assertTrue("Queue was not deleted when subscription was removed",
                   _queue.isDeleted());
     }
-    
+
     public void testResend() throws Exception
     {
         _queue.registerSubscription(_subscription, false);
@@ -276,9 +273,9 @@
         QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry();
         entry.setRedelivered(true);
         _queue.resend(entry, _subscription);
-        
+
     }
-    
+
     public void testGetFirstMessageId() throws Exception
     {
         // Create message
@@ -335,7 +332,7 @@
             assertEquals("Message ID was wrong", messageId, msgids.get(i));
         }
     }
-    
+
     public void testGetMessagesRangeOnTheQueue() throws Exception
     {
         for (int i = 1 ; i <= 10; i++)
@@ -346,12 +343,12 @@
             // Put message on queue
             _queue.enqueue(message);
         }
-        
+
         // Get non-existent 0th QueueEntry & check returned list was empty
         // (the position parameters in this method are indexed from 1)
         List<QueueEntry> entries = _queue.getMessagesRangeOnTheQueue(0, 0);
         assertTrue(entries.size() == 0);
-        
+
         // Check that when 'from' is 0 it is ignored and the range continues 
from 1
         entries = _queue.getMessagesRangeOnTheQueue(0, 2);
         assertTrue(entries.size() == 2);
@@ -363,13 +360,13 @@
         // Check that when 'from' is greater than 'to' the returned list is 
empty
         entries = _queue.getMessagesRangeOnTheQueue(5, 4);
         assertTrue(entries.size() == 0);
-        
-        // Get first QueueEntry & check id 
+
+        // Get first QueueEntry & check id
         entries = _queue.getMessagesRangeOnTheQueue(1, 1);
         assertTrue(entries.size() == 1);
         msgID = entries.get(0).getMessage().getMessageNumber();
         assertEquals("Message ID was wrong", msgID, 1L);
-        
+
         // Get 5th,6th,7th entries and check id's
         entries = _queue.getMessagesRangeOnTheQueue(5, 7);
         assertTrue(entries.size() == 3);
@@ -379,17 +376,17 @@
         assertEquals("Message ID was wrong", msgID, 6L);
         msgID = entries.get(2).getMessage().getMessageNumber();
         assertEquals("Message ID was wrong", msgID, 7L);
-        
+
         // Get 10th QueueEntry & check id
         entries = _queue.getMessagesRangeOnTheQueue(10, 10);
         assertTrue(entries.size() == 1);
         msgID = entries.get(0).getMessage().getMessageNumber();
         assertEquals("Message ID was wrong", msgID, 10L);
-        
+
         // Get non-existent 11th QueueEntry & check returned set was empty
         entries = _queue.getMessagesRangeOnTheQueue(11, 11);
         assertTrue(entries.size() == 0);
-        
+
         // Get 9th,10th, and non-existent 11th entries & check result is of 
size 2 with correct IDs
         entries = _queue.getMessagesRangeOnTheQueue(9, 11);
         assertTrue(entries.size() == 2);
@@ -398,35 +395,51 @@
         msgID = entries.get(1).getMessage().getMessageNumber();
         assertEquals("Message ID was wrong", msgID, 10L);
     }
-  
+
     public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() 
throws AMQException
     {
         // Create IncomingMessage and nondurable queue
-        NonTransactionalContext txnContext = new 
NonTransactionalContext(_store, null, null, null);
-        IncomingMessage msg = new IncomingMessage(1L, info, txnContext, null);
+        final IncomingMessage msg = new IncomingMessage(1L, info,  null);
         ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
         contentHeaderBody.properties = new BasicContentHeaderProperties();
         ((BasicContentHeaderProperties) 
contentHeaderBody.properties).setDeliveryMode((byte) 2);
         msg.setContentHeaderBody(contentHeaderBody);
-        ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
-        
+        final ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+
         // Send persistent message
+
         qs.add(_queue);
-        msg.enqueue(qs);
         msg.routingComplete(_store, new MessageHandleFactory());
-        _store.storeMessageMetaData(null, new Long(1L), new 
MessageMetaData(info, contentHeaderBody, 1));
-        
+        _store.storeMessageMetaData(new Long(1L), new MessageMetaData(info, 
contentHeaderBody, 1));
+
+
+        Transaction txn = new AutoCommitTransaction(_store);
+
+        txn.enqueue(qs, msg, new Transaction.Action()
+                                    {
+                                        public void postCommit()
+                                        {
+                                            msg.enqueue(qs);
+                                        }
+
+                                        public void onRollback()
+                                        {
+                                        }
+                                    });
+
+
+
         // Check that it is enqueued
         AMQQueue data = _store.getMessages().get(1L);
-        assertNotNull(data);
-        
+        assertNull(data);
+
         // Dequeue message
         MockQueueEntry entry = new MockQueueEntry();
-        AMQMessage amqmsg = new AMQMessage(1L, _store, new 
MessageHandleFactory(), txnContext);
-        
+        AMQMessage amqmsg = new AMQMessage(1L, _store, new 
MessageHandleFactory());
+
         entry.setMessage(amqmsg);
-        _queue.dequeue(null, entry);
-        
+        _queue.dequeue(entry);
+
         // Check that it is dequeued
         data = _store.getMessages().get(1L);
         assertNull(data);
@@ -434,29 +447,13 @@
 
 
     // FIXME: move this to somewhere useful
-    private static AMQMessageHandle createMessageHandle(final long messageId, 
final MessagePublishInfo publishBody)
+    private static AMQMessageHandle createMessageHandle(final long messageId, 
final MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody)
     {
         final AMQMessageHandle amqMessageHandle = (new 
MessageHandleFactory()).createMessageHandle(messageId,
                                                                                
                    null,
                                                                                
                    false);
-        try
-        {
-            amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),
-                                                              publishBody,
-                                                              new 
ContentHeaderBody()
-            {
-                public int getSize()
-                {
-                    return 1;
-                }
-            });
-        }
-        catch (AMQException e)
-        {
-            // won't happen
-        }
-
 
+        amqMessageHandle.setPublishAndContentHeaderBody(publishBody, 
contentHeaderBody);
         return amqMessageHandle;
     }
 
@@ -468,11 +465,16 @@
         TestMessage(long tag, long messageId, MessagePublishInfo publishBody, 
StoreContext storeContext)
                 throws AMQException
         {
-            super(createMessageHandle(messageId, publishBody), storeContext, 
publishBody);
+            this(tag, messageId, publishBody, new ContentHeaderBody(1, 1, new 
BasicContentHeaderProperties(), 0));
+
+        }
+        TestMessage(long tag, long messageId, MessagePublishInfo publishBody, 
ContentHeaderBody chb)
+                throws AMQException
+        {
+            super(createMessageHandle(messageId, publishBody, chb), chb, 0, 
publishBody);
             _tag = tag;
         }
 
-
         public boolean incrementReference()
         {
             _count++;
@@ -489,7 +491,7 @@
             assertEquals("Wrong count for message with tag " + _tag, expected, 
_count);
         }
     }
-    
+
     protected AMQMessage createMessage(Long id) throws AMQException
     {
         AMQMessage messageA = new TestMessage(id, id, info, new 
StoreContext());

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
 Sun Oct 11 15:10:43 2009
@@ -29,15 +29,9 @@
 import org.apache.qpid.server.exchange.TopicExchange;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.AMQPriorityQueue;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
-import org.apache.qpid.server.queue.ExchangeBinding;
-import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.queue.*;
+import org.apache.qpid.server.txn.Transaction;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.protocol.InternalTestProtocolSession;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.framing.AMQShortString;
@@ -344,22 +338,21 @@
 
         MessagePublishInfo messageInfo = new 
TestMessagePublishInfo(directExchange, false, false, routingKey);
 
-        IncomingMessage currentMessage = null;
+        final IncomingMessage currentMessage;
 
         try
         {
             currentMessage = new 
IncomingMessage(_virtualHost.getMessageStore().getNewMessageId(),
                                                  messageInfo,
-                                                 new 
NonTransactionalContext(_virtualHost.getMessageStore(),
-                                                                             
new StoreContext(), null, null),
                                                  new 
InternalTestProtocolSession(_virtualHost));
         }
         catch (AMQException e)
         {
             fail(e.getMessage());
+            //help compiler - next line never reached
+            throw new RuntimeException();
         }
 
-        currentMessage.setMessageStore(_virtualHost.getMessageStore());
         currentMessage.setExchange(directExchange);
 
         ContentHeaderBody headerBody = new ContentHeaderBody();
@@ -379,14 +372,9 @@
 
         currentMessage.setExpiration();
 
-        try
-        {
-            currentMessage.route();
-        }
-        catch (AMQException e)
-        {
-            fail(e.getMessage());
-        }
+
+        currentMessage.route();
+
 
         try
         {
@@ -400,14 +388,32 @@
         // check and deliver if header says body length is zero
         if (currentMessage.allContentReceived())
         {
-            try
-            {
-                currentMessage.deliverToQueues();
-            }
-            catch (AMQException e)
-            {
-                fail(e.getMessage());
-            }
+            // TODO Deliver to queues
+            Transaction trans = new 
AutoCommitTransaction(_virtualHost.getMessageStore());
+            final List<AMQQueue> destinationQueues = 
currentMessage.getDestinationQueues();
+            trans.enqueue(currentMessage.getDestinationQueues(), 
currentMessage, new Transaction.Action() {
+                public void postCommit()
+                {
+                    try
+                    {
+                        AMQMessage message = new 
AMQMessage(currentMessage.getMessageHandle(), 
currentMessage.getContentHeader(), currentMessage.getSize() 
,currentMessage.getMessagePublishInfo());
+
+                        for(AMQQueue queue : destinationQueues)
+                        {
+                            QueueEntry entry = queue.enqueue(message);
+                        }
+                    }
+                    catch (AMQException e)
+                    {
+                        e.printStackTrace();  
+                    }
+                }
+
+                public void onRollback()
+                {
+                    //To change body of implemented methods use File | 
Settings | File Templates.
+                }
+            });
         }
     }
 

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
 Sun Oct 11 15:10:43 2009
@@ -55,7 +55,7 @@
     {
     }
 
-    public void removeMessage(StoreContext s, Long messageId)
+    public void removeMessage(Long messageId)
     {
     }
 
@@ -100,6 +100,23 @@
     {
     }
 
+    public StoreFuture commitTranAsync(StoreContext context) throws 
AMQException
+    {
+        commitTran(context);
+        return new StoreFuture() 
+                    {
+                        public boolean isComplete()
+                        {
+                            return true;
+                        }
+
+                        public void waitForCompletion()
+                        {
+
+                        }
+                    };
+    }
+
     public void abortTran(StoreContext storeContext) throws AMQException
     {
     }
@@ -114,22 +131,26 @@
         return _messageId.getAndIncrement();
     }
 
-    public void storeContentBodyChunk(StoreContext sc, Long messageId, int 
index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
+    public void storeContentBodyChunk(
+            Long messageId,
+            int index,
+            ContentChunk contentBody,
+            boolean lastContentBody) throws AMQException
     {
 
     }
 
-    public void storeMessageMetaData(StoreContext sc, Long messageId, 
MessageMetaData messageMetaData) throws AMQException
+    public void storeMessageMetaData(Long messageId, MessageMetaData 
messageMetaData) throws AMQException
     {
 
     }
 
-    public MessageMetaData getMessageMetaData(StoreContext s,Long messageId) 
throws AMQException
+    public MessageMetaData getMessageMetaData(Long messageId) throws 
AMQException
     {
         return null;
     }
 
-    public ContentChunk getContentBodyChunk(StoreContext s,Long messageId, int 
index) throws AMQException
+    public ContentChunk getContentBodyChunk(Long messageId, int index) throws 
AMQException
     {
         return null;
     }

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
 Sun Oct 11 15:10:43 2009
@@ -29,6 +29,7 @@
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.MessageHandleFactory;
 import org.apache.qpid.server.queue.AMQMessageHandle;
+import org.apache.qpid.server.queue.MessageMetaData;
 
 /**
  * Tests that reference counting works correctly with AMQMessage and the 
message store
@@ -37,14 +38,12 @@
 {
     private TestMemoryMessageStore _store;
 
-    private StoreContext _storeContext = new StoreContext();
-
 
     protected void setUp() throws Exception
     {
         super.setUp();
         _store = new TestMemoryMessageStore();
-        StoreContext.setCurrentContext(_storeContext);
+
     }
 
     /**
@@ -86,9 +85,13 @@
 
         final long messageId = _store.getNewMessageId();
         AMQMessageHandle messageHandle = (new 
MessageHandleFactory()).createMessageHandle(messageId, _store, true);
-        messageHandle.setPublishAndContentHeaderBody(_storeContext,info, chb);
+
+        MessageMetaData mmd = 
messageHandle.setPublishAndContentHeaderBody(info, chb);
+        _store.storeMessageMetaData(messageId, mmd);
+
+
         AMQMessage message = new AMQMessage(messageHandle,
-                                             _storeContext,info);
+                                             chb, chb.bodySize,info);
 
         message = message.takeReference();
 
@@ -145,9 +148,12 @@
         final Long messageId = _store.getNewMessageId();
         final ContentHeaderBody chb = createPersistentContentHeader();
         AMQMessageHandle messageHandle = (new 
MessageHandleFactory()).createMessageHandle(messageId, _store, true);
-        messageHandle.setPublishAndContentHeaderBody(_storeContext,info,chb);
+
+        MessageMetaData mmd = 
messageHandle.setPublishAndContentHeaderBody(info, chb);
+        _store.storeMessageMetaData(messageId, mmd);
+
         AMQMessage message = new AMQMessage(messageHandle,
-                                             _storeContext,
+                                             chb, chb.bodySize,
                                             info);
         
         

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
 Sun Oct 11 15:10:43 2009
@@ -47,6 +47,10 @@
     private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
     private final Lock _stateChangeLock = new ReentrantLock();
 
+    private final QueueEntry.SubscriptionAcquiredState _owningState = new 
QueueEntry.SubscriptionAcquiredState(this);
+    private final QueueEntry.SubscriptionAssignedState _assignedState = new 
QueueEntry.SubscriptionAssignedState(this);
+
+
     private static final AtomicLong idGenerator = new AtomicLong(0);
     // Create a simple ID that increments for ever new Subscription
     private final long _subscriptionID = idGenerator.getAndIncrement();
@@ -88,7 +92,12 @@
 
     public SubscriptionAcquiredState getOwningState()
     {
-        return new QueueEntry.SubscriptionAcquiredState(this);
+        return _owningState;
+    }
+
+    public QueueEntry.SubscriptionAssignedState getAssignedState()
+    {
+        return _assignedState;
     }
 
     public LogActor getLogActor()

Modified: 
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
 Sun Oct 11 15:10:43 2009
@@ -41,12 +41,10 @@
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.util.MockChannel;
 
-import java.security.Principal;
 
 public class InternalBrokerBaseCase extends TestCase
 {
@@ -55,7 +53,6 @@
     protected MockChannel _channel;
     protected InternalTestProtocolSession _session;
     protected VirtualHost _virtualHost;
-    protected StoreContext _storeContext = new StoreContext();
     protected AMQQueue _queue;
     protected AMQShortString QUEUE_NAME;
 

Modified: 
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/client/SimpleConnectionTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/client/SimpleConnectionTest.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/client/SimpleConnectionTest.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/client/SimpleConnectionTest.java
 Sun Oct 11 15:10:43 2009
@@ -28,21 +28,43 @@
 
 public class SimpleConnectionTest extends TestCase
 {
-    public void testConnection()
+/*    public void testConnection()
     {
         try
         {
             AMQConnection conn = new AMQConnection("127.0.0.1", 5673, "guest", 
"guest", "test", "/test");
+
+
             QueueSession s = conn.createQueueSession(false, 
Session.AUTO_ACKNOWLEDGE);
             QueueSender p = s.createSender(new AMQQueue("amq.direct", 
"queue"));
-            p.send(s.createTextMessage("test"));
+            for(int i = 0; i < 6000; i++)
+            {
+                p.send(s.createTextMessage("test("+i+")"));
+            }
 
             QueueReceiver r = s.createReceiver(new AMQQueue("amq.direct", 
"queue"));
             conn.start();
-            Message m = r.receive();
-
-            Thread.sleep(60000L);
+            Thread.sleep(1000L);
+            for(int i = 0; i < 3000; i++)
+            {
+                Message m = r.receive();
+            }
             conn.close();
+
+            conn = new AMQConnection("127.0.0.1", 5673, "guest", "guest", 
"test", "/test");
+            s = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+            r = s.createReceiver(new AMQQueue("amq.direct", "queue"));
+            conn.start();
+            Message m;
+            int rcvCnt = 0;
+            while((m = r.receive(1000))!= null)
+            {
+                rcvCnt++;
+            }
+            System.out.print(rcvCnt);
+
+            Thread.sleep(60000l);
+
         }
         catch (AMQException e)
         {
@@ -61,4 +83,68 @@
             e.printStackTrace();  //To change body of catch statement use File 
| Settings | File Templates.
         }
     }
-}
+
+
+    public void testConnection2()
+    {
+        try
+        {
+            AMQConnection conn = new AMQConnection("127.0.0.1", 5673, "guest", 
"guest", "test", "/test");
+            AMQConnection conn2 = new AMQConnection("127.0.0.1", 5673, 
"guest", "guest", "test", "/test");
+
+            AMQQueue amqQueue = new AMQQueue("amq.direct", "queue");
+
+            QueueSession s = conn.createQueueSession(true, 
Session.SESSION_TRANSACTED);
+            QueueSender p = s.createSender(amqQueue);
+
+            QueueSession s2 = conn2.createQueueSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            QueueReceiver r2 = s2.createReceiver(amqQueue);
+            r2.setMessageListener(new MessageListener()
+            {
+
+                public void onMessage(Message message)
+                {
+                    try
+                    {
+                        
System.out.println("***************************************************************************");
+                        
System.out.println("***************************************************************************");
+                        System.out.println("**  " 
+((TextMessage)message).getText());
+                        
System.out.println("***************************************************************************");
+                        
System.out.println("***************************************************************************");
+                    }
+                    catch (JMSException e)
+                    {
+                        e.printStackTrace();  //To change body of catch 
statement use File | Settings | File Templates.
+                    }
+                }
+            });
+            conn2.start();
+
+            for(int i = 0; i < 6000; i++)
+            {
+                p.send(s.createTextMessage("test("+i+")"));
+                if(i%10 == 0)
+                {   Thread.sleep(5000);
+                    s.commit();
+
+                }
+            }
+        }
+        catch (JMSException e)
+        {
+            e.printStackTrace();  //To change body of catch statement use File 
| Settings | File Templates.
+        }
+        catch (URLSyntaxException e)
+        {
+            e.printStackTrace();  //To change body of catch statement use File 
| Settings | File Templates.
+        }
+        catch (AMQException e)
+        {
+            e.printStackTrace();  //To change body of catch statement use File 
| Settings | File Templates.
+        }
+        catch (InterruptedException e)
+        {
+            e.printStackTrace();  //To change body of catch statement use File 
| Settings | File Templates.
+        }
+    }
+*/}

Modified: 
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java
 Sun Oct 11 15:10:43 2009
@@ -215,7 +215,7 @@
         List<String> results = _monitor.findMatches(BND_PREFIX);
 
         // We will have two binds as we bind all queues to the default exchange
-        assertEquals("Result set larger than expected.", 4, results.size());
+        assertEquals("Result not as expected." + results, 4, results.size());
 
 
         String messageID = "BND-1001";
@@ -241,7 +241,7 @@
 
         String subject = fromSubject(log);
 
-        assertTrue("Routing Key does not start with 
TempQueue:"+AbstractTestLogSubject.getSlice("rk", subject), 
+        assertTrue("Routing Key does not start with 
TempQueue:"+AbstractTestLogSubject.getSlice("rk", subject),
                      AbstractTestLogSubject.getSlice("rk", 
subject).startsWith("TempQueue"));
         assertEquals("Virtualhost not correct.", "/test",
                      AbstractTestLogSubject.getSlice("vh", subject));

Modified: 
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
 Sun Oct 11 15:10:43 2009
@@ -183,6 +183,11 @@
         return null;  //To change body of implemented methods use File | 
Settings | File Templates.
     }
 
+    public QueueEntry.SubscriptionAssignedState getAssignedState()
+    {
+        return null;  //To change body of implemented methods use File | 
Settings | File Templates.
+    }
+
     public void queueDeleted(AMQQueue queue)
     {
     }

Modified: 
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
 Sun Oct 11 15:10:43 2009
@@ -155,10 +155,10 @@
         doPostDelay("close");
     }
 
-    public void removeMessage(StoreContext storeContext, Long messageId) 
throws AMQException
+    public void removeMessage(Long messageId) throws AMQException
     {
         doPreDelay("removeMessage");
-        _realStore.removeMessage(storeContext, messageId);
+        _realStore.removeMessage(messageId);
         doPostDelay("removeMessage");
     }
 
@@ -237,6 +237,24 @@
         doPostDelay("commitTran");
     }
 
+    public StoreFuture commitTranAsync(StoreContext context) throws 
AMQException
+    {
+        commitTran(context);
+        return new StoreFuture() 
+                    {
+                        public boolean isComplete()
+                        {
+                            return true;
+                        }
+
+                        public void waitForCompletion()
+                        {
+
+                        }
+                    };
+
+    }
+
     public void abortTran(StoreContext context) throws AMQException
     {
         doPreDelay("abortTran");
@@ -260,32 +278,36 @@
         return l;
     }
 
-    public void storeContentBodyChunk(StoreContext context, Long messageId, 
int index, ContentChunk contentBody, boolean lastContentBody) throws 
AMQException
+    public void storeContentBodyChunk(
+            Long messageId,
+            int index,
+            ContentChunk contentBody,
+            boolean lastContentBody) throws AMQException
     {
         doPreDelay("storeContentBodyChunk");
-        _realStore.storeContentBodyChunk(context, messageId, index, 
contentBody, lastContentBody);
+        _realStore.storeContentBodyChunk(messageId, index, contentBody, 
lastContentBody);
         doPostDelay("storeContentBodyChunk");
     }
 
-    public void storeMessageMetaData(StoreContext context, Long messageId, 
MessageMetaData messageMetaData) throws AMQException
+    public void storeMessageMetaData(Long messageId, MessageMetaData 
messageMetaData) throws AMQException
     {
         doPreDelay("storeMessageMetaData");
-        _realStore.storeMessageMetaData(context, messageId, messageMetaData);
+        _realStore.storeMessageMetaData(messageId, messageMetaData);
         doPostDelay("storeMessageMetaData");
     }
 
-    public MessageMetaData getMessageMetaData(StoreContext context, Long 
messageId) throws AMQException
+    public MessageMetaData getMessageMetaData(Long messageId) throws 
AMQException
     {
         doPreDelay("getMessageMetaData");
-        MessageMetaData mmd = _realStore.getMessageMetaData(context, 
messageId);
+        MessageMetaData mmd = _realStore.getMessageMetaData(messageId);
         doPostDelay("getMessageMetaData");
         return mmd;
     }
 
-    public ContentChunk getContentBodyChunk(StoreContext context, Long 
messageId, int index) throws AMQException
+    public ContentChunk getContentBodyChunk(Long messageId, int index) throws 
AMQException
     {
         doPreDelay("getContentBodyChunk");
-        ContentChunk c = _realStore.getContentBodyChunk(context, messageId, 
index);
+        ContentChunk c = _realStore.getContentBodyChunk(messageId, index);
         doPostDelay("getContentBodyChunk");
         return c;
     }

Modified: 
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- 
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
 (original)
+++ 
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
 Sun Oct 11 15:10:43 2009
@@ -311,11 +311,13 @@
 
         AMQTopic topic = new AMQTopic(con, "testNoLocal");
 
-        TopicSession session1 = con.createTopicSession(true, 
AMQSession.NO_ACKNOWLEDGE);
+        TopicSession session1 = con.createTopicSession(true, 
AMQSession.AUTO_ACKNOWLEDGE);
         TopicSubscriber noLocal = session1.createSubscriber(topic,  "", true);
+
         TopicSubscriber select = session1.createSubscriber(topic,  "Selector = 
'select'", false);
         TopicSubscriber normal = session1.createSubscriber(topic);
 
+
         TopicPublisher publisher = session1.createPublisher(topic);
 
         con.start();
@@ -329,12 +331,12 @@
         m = (TextMessage) normal.receive(1000);
         assertNotNull(m);
         session1.commit();
-        
+
         //test selector subscriber doesn't message
         m = (TextMessage) select.receive(1000);
         assertNull(m);
         session1.commit();
-        
+
         //test nolocal subscriber doesn't message
         m = (TextMessage) noLocal.receive(1000);
         if (m != null)
@@ -349,12 +351,12 @@
 
         publisher.publish(message);
         session1.commit();
-        
+
         //test normal subscriber gets message
         m = (TextMessage) normal.receive(1000);
         assertNotNull(m);
         session1.commit();
-        
+
         //test selector subscriber does get message
         m = (TextMessage) select.receive(1000);
         assertNotNull(m);
@@ -365,7 +367,7 @@
         assertNull(m);
 
         AMQConnection con2 = (AMQConnection) getConnection("guest", "guest", 
"foo");
-        TopicSession session2 = con2.createTopicSession(true, 
AMQSession.NO_ACKNOWLEDGE);
+        TopicSession session2 = con2.createTopicSession(true, 
AMQSession.AUTO_ACKNOWLEDGE);
         TopicPublisher publisher2 = session2.createPublisher(topic);
 
 
@@ -386,18 +388,18 @@
         session1.commit();
 
         //test nolocal subscriber does message
-        m = (TextMessage) noLocal.receive(100);
+        m = (TextMessage) noLocal.receive(1000);
         assertNotNull(m);
 
 
         con.close();
         con2.close();
     }
-    
+
     /**
      * This tests QPID-1191, where messages which are sent to a topic but are 
not consumed by a subscriber
      * due to a selector can be leaked.
-     * @throws Exception 
+     * @throws Exception
      */
     public void testNonMatchingMessagesDoNotFillQueue() throws Exception
     {
@@ -420,27 +422,27 @@
         message = session.createTextMessage("non-matching 1");
         publisher.publish(message);
         session.commit();
-        
+
         // Send and consume matching message
         message = session.createTextMessage("hello");
         message.setStringProperty("Selector", "select");
 
         publisher.publish(message);
         session.commit();
-        
+
         m = (TextMessage) selector.receive(1000);
         assertNotNull("should have received message", m);
         assertEquals("Message contents were wrong", "hello", m.getText());
-        
+
         // Send non-matching message
         message = session.createTextMessage("non-matching 2");
         publisher.publish(message);
         session.commit();
-        
+
         // Assert queue count is 0
         long depth = ((AMQTopicSessionAdaptor) 
session).getSession().getQueueDepth(topic);
         assertEquals("Queue depth was wrong", 0, depth);
-        
+
     }
 
     public static junit.framework.Test suite()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to