Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=824084&r1=824083&r2=824084&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Sun Oct 11 15:10:43 2009 @@ -1,6 +1,6 @@ package org.apache.qpid.server.queue; /* - * + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -8,16 +8,16 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * + * */ @@ -31,21 +31,18 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.ContentHeaderProperties; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.DirectExchange; -import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.subscription.MockSubscription; import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.subscription.SubscriptionImpl; -import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.Transaction; public class SimpleAMQQueueTest extends TestCase { @@ -59,7 +56,7 @@ protected DirectExchange _exchange = new DirectExchange(); protected MockSubscription _subscription = new MockSubscription(); protected FieldTable _arguments = null; - + MessagePublishInfo info = new MessagePublishInfo() { @@ -88,7 +85,7 @@ return null; } }; - + @Override protected void setUp() throws Exception { @@ -119,51 +116,51 @@ } catch (IllegalArgumentException e) { - assertTrue("Exception was not about missing name", + assertTrue("Exception was not about missing name", e.getMessage().contains("name")); } - + try { _queue = new SimpleAMQQueue(_qname, false, _owner, false, null); assertNull("Queue was created", _queue); } catch (IllegalArgumentException e) { - assertTrue("Exception was not about missing vhost", + assertTrue("Exception was not about missing vhost", e.getMessage().contains("Host")); } - _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, + _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, _virtualHost, _arguments); assertNotNull("Queue was not created", _queue); } - + public void testGetVirtualHost() { assertEquals("Virtual host was wrong", _virtualHost, _queue.getVirtualHost()); } - + public void testBinding() { try { _queue.bind(_exchange, _routingKey, null); - assertTrue("Routing key was not bound", + assertTrue("Routing key was not bound", _exchange.getBindings().containsKey(_routingKey)); - assertEquals("Queue was not bound to key", + assertEquals("Queue was not bound to key", _exchange.getBindings().get(_routingKey).get(0), _queue); - assertEquals("Exchange binding count", 1, + assertEquals("Exchange binding count", 1, _queue.getExchangeBindings().size()); - assertEquals("Wrong exchange bound", _routingKey, + assertEquals("Wrong exchange bound", _routingKey, _queue.getExchangeBindings().get(0).getRoutingKey()); - assertEquals("Wrong exchange bound", _exchange, + assertEquals("Wrong exchange bound", _exchange, _queue.getExchangeBindings().get(0).getExchange()); - + _queue.unBind(_exchange, _routingKey, null); - assertFalse("Routing key was still bound", + assertFalse("Routing key was still bound", _exchange.getBindings().containsKey(_routingKey)); - assertNull("Routing key was not empty", + assertNull("Routing key was not empty", _exchange.getBindings().get(_routingKey)); } catch (AMQException e) @@ -171,36 +168,36 @@ assertNull("Unexpected exception", e); } } - + public void testSubscription() throws AMQException { // Check adding a subscription adds it to the queue _queue.registerSubscription(_subscription, false); - assertEquals("Subscription did not get queue", _queue, + assertEquals("Subscription did not get queue", _queue, _subscription.getQueue()); - assertEquals("Queue does not have consumer", 1, + assertEquals("Queue does not have consumer", 1, _queue.getConsumerCount()); - assertEquals("Queue does not have active consumer", 1, + assertEquals("Queue does not have active consumer", 1, _queue.getActiveConsumerCount()); - + // Check sending a message ends up with the subscriber AMQMessage messageA = createMessage(new Long(24)); _queue.enqueue(messageA); assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); - + // Check removing the subscription removes it's information from the queue _queue.unregisterSubscription(_subscription); assertTrue("Subscription still had queue", _subscription.isClosed()); assertFalse("Queue still has consumer", 1 == _queue.getConsumerCount()); - assertFalse("Queue still has active consumer", + assertFalse("Queue still has active consumer", 1 == _queue.getActiveConsumerCount()); - + AMQMessage messageB = createMessage(new Long (25)); _queue.enqueue(messageB); - QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry(); - assertNull(entry); + assertNull(_subscription.getQueueContext()); + } - + public void testQueueNoSubscriber() throws AMQException, InterruptedException { AMQMessage messageA = createMessage(new Long(24)); @@ -214,18 +211,18 @@ { // Check adding an exclusive subscription adds it to the queue _queue.registerSubscription(_subscription, true); - assertEquals("Subscription did not get queue", _queue, + assertEquals("Subscription did not get queue", _queue, _subscription.getQueue()); - assertEquals("Queue does not have consumer", 1, + assertEquals("Queue does not have consumer", 1, _queue.getConsumerCount()); - assertEquals("Queue does not have active consumer", 1, + assertEquals("Queue does not have active consumer", 1, _queue.getActiveConsumerCount()); // Check sending a message ends up with the subscriber AMQMessage messageA = createMessage(new Long(24)); _queue.enqueue(messageA); assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); - + // Check we cannot add a second subscriber to the queue Subscription subB = new MockSubscription(); Exception ex = null; @@ -235,12 +232,12 @@ } catch (AMQException e) { - ex = e; + ex = e; } assertNotNull(ex); assertTrue(ex instanceof AMQException); - // Check we cannot add an exclusive subscriber to a queue with an + // Check we cannot add an exclusive subscriber to a queue with an // existing subscription _queue.unregisterSubscription(_subscription); _queue.registerSubscription(_subscription, false); @@ -250,15 +247,15 @@ } catch (AMQException e) { - ex = e; + ex = e; } assertNotNull(ex); } - - public void testAutoDeleteQueue() throws Exception + + public void testAutoDeleteQueue() throws Exception { _queue.stop(); - _queue = new SimpleAMQQueue(_qname, false, _owner, true, _virtualHost); + _queue = new SimpleAMQQueue(_qname, false, null, true, _virtualHost); _queue.registerSubscription(_subscription, false); AMQMessage message = createMessage(new Long(25)); _queue.enqueue(message); @@ -266,7 +263,7 @@ assertTrue("Queue was not deleted when subscription was removed", _queue.isDeleted()); } - + public void testResend() throws Exception { _queue.registerSubscription(_subscription, false); @@ -276,9 +273,9 @@ QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry(); entry.setRedelivered(true); _queue.resend(entry, _subscription); - + } - + public void testGetFirstMessageId() throws Exception { // Create message @@ -335,7 +332,7 @@ assertEquals("Message ID was wrong", messageId, msgids.get(i)); } } - + public void testGetMessagesRangeOnTheQueue() throws Exception { for (int i = 1 ; i <= 10; i++) @@ -346,12 +343,12 @@ // Put message on queue _queue.enqueue(message); } - + // Get non-existent 0th QueueEntry & check returned list was empty // (the position parameters in this method are indexed from 1) List<QueueEntry> entries = _queue.getMessagesRangeOnTheQueue(0, 0); assertTrue(entries.size() == 0); - + // Check that when 'from' is 0 it is ignored and the range continues from 1 entries = _queue.getMessagesRangeOnTheQueue(0, 2); assertTrue(entries.size() == 2); @@ -363,13 +360,13 @@ // Check that when 'from' is greater than 'to' the returned list is empty entries = _queue.getMessagesRangeOnTheQueue(5, 4); assertTrue(entries.size() == 0); - - // Get first QueueEntry & check id + + // Get first QueueEntry & check id entries = _queue.getMessagesRangeOnTheQueue(1, 1); assertTrue(entries.size() == 1); msgID = entries.get(0).getMessage().getMessageNumber(); assertEquals("Message ID was wrong", msgID, 1L); - + // Get 5th,6th,7th entries and check id's entries = _queue.getMessagesRangeOnTheQueue(5, 7); assertTrue(entries.size() == 3); @@ -379,17 +376,17 @@ assertEquals("Message ID was wrong", msgID, 6L); msgID = entries.get(2).getMessage().getMessageNumber(); assertEquals("Message ID was wrong", msgID, 7L); - + // Get 10th QueueEntry & check id entries = _queue.getMessagesRangeOnTheQueue(10, 10); assertTrue(entries.size() == 1); msgID = entries.get(0).getMessage().getMessageNumber(); assertEquals("Message ID was wrong", msgID, 10L); - + // Get non-existent 11th QueueEntry & check returned set was empty entries = _queue.getMessagesRangeOnTheQueue(11, 11); assertTrue(entries.size() == 0); - + // Get 9th,10th, and non-existent 11th entries & check result is of size 2 with correct IDs entries = _queue.getMessagesRangeOnTheQueue(9, 11); assertTrue(entries.size() == 2); @@ -398,35 +395,51 @@ msgID = entries.get(1).getMessage().getMessageNumber(); assertEquals("Message ID was wrong", msgID, 10L); } - + public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException { // Create IncomingMessage and nondurable queue - NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null); - IncomingMessage msg = new IncomingMessage(1L, info, txnContext, null); + final IncomingMessage msg = new IncomingMessage(1L, info, null); ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); contentHeaderBody.properties = new BasicContentHeaderProperties(); ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2); msg.setContentHeaderBody(contentHeaderBody); - ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); - + final ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); + // Send persistent message + qs.add(_queue); - msg.enqueue(qs); msg.routingComplete(_store, new MessageHandleFactory()); - _store.storeMessageMetaData(null, new Long(1L), new MessageMetaData(info, contentHeaderBody, 1)); - + _store.storeMessageMetaData(new Long(1L), new MessageMetaData(info, contentHeaderBody, 1)); + + + Transaction txn = new AutoCommitTransaction(_store); + + txn.enqueue(qs, msg, new Transaction.Action() + { + public void postCommit() + { + msg.enqueue(qs); + } + + public void onRollback() + { + } + }); + + + // Check that it is enqueued AMQQueue data = _store.getMessages().get(1L); - assertNotNull(data); - + assertNull(data); + // Dequeue message MockQueueEntry entry = new MockQueueEntry(); - AMQMessage amqmsg = new AMQMessage(1L, _store, new MessageHandleFactory(), txnContext); - + AMQMessage amqmsg = new AMQMessage(1L, _store, new MessageHandleFactory()); + entry.setMessage(amqmsg); - _queue.dequeue(null, entry); - + _queue.dequeue(entry); + // Check that it is dequeued data = _store.getMessages().get(1L); assertNull(data); @@ -434,29 +447,13 @@ // FIXME: move this to somewhere useful - private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody) + private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody) { final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, null, false); - try - { - amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(), - publishBody, - new ContentHeaderBody() - { - public int getSize() - { - return 1; - } - }); - } - catch (AMQException e) - { - // won't happen - } - + amqMessageHandle.setPublishAndContentHeaderBody(publishBody, contentHeaderBody); return amqMessageHandle; } @@ -468,11 +465,16 @@ TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext) throws AMQException { - super(createMessageHandle(messageId, publishBody), storeContext, publishBody); + this(tag, messageId, publishBody, new ContentHeaderBody(1, 1, new BasicContentHeaderProperties(), 0)); + + } + TestMessage(long tag, long messageId, MessagePublishInfo publishBody, ContentHeaderBody chb) + throws AMQException + { + super(createMessageHandle(messageId, publishBody, chb), chb, 0, publishBody); _tag = tag; } - public boolean incrementReference() { _count++; @@ -489,7 +491,7 @@ assertEquals("Wrong count for message with tag " + _tag, expected, _count); } } - + protected AMQMessage createMessage(Long id) throws AMQException { AMQMessage messageA = new TestMessage(id, id, info, new StoreContext());
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java?rev=824084&r1=824083&r2=824084&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java Sun Oct 11 15:10:43 2009 @@ -29,15 +29,9 @@ import org.apache.qpid.server.exchange.TopicExchange; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.AMQQueueFactory; -import org.apache.qpid.server.queue.IncomingMessage; -import org.apache.qpid.server.queue.MessageHandleFactory; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.queue.AMQPriorityQueue; -import org.apache.qpid.server.queue.SimpleAMQQueue; -import org.apache.qpid.server.queue.ExchangeBinding; -import org.apache.qpid.server.txn.NonTransactionalContext; +import org.apache.qpid.server.queue.*; +import org.apache.qpid.server.txn.Transaction; +import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.protocol.InternalTestProtocolSession; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.framing.AMQShortString; @@ -344,22 +338,21 @@ MessagePublishInfo messageInfo = new TestMessagePublishInfo(directExchange, false, false, routingKey); - IncomingMessage currentMessage = null; + final IncomingMessage currentMessage; try { currentMessage = new IncomingMessage(_virtualHost.getMessageStore().getNewMessageId(), messageInfo, - new NonTransactionalContext(_virtualHost.getMessageStore(), - new StoreContext(), null, null), new InternalTestProtocolSession(_virtualHost)); } catch (AMQException e) { fail(e.getMessage()); + //help compiler - next line never reached + throw new RuntimeException(); } - currentMessage.setMessageStore(_virtualHost.getMessageStore()); currentMessage.setExchange(directExchange); ContentHeaderBody headerBody = new ContentHeaderBody(); @@ -379,14 +372,9 @@ currentMessage.setExpiration(); - try - { - currentMessage.route(); - } - catch (AMQException e) - { - fail(e.getMessage()); - } + + currentMessage.route(); + try { @@ -400,14 +388,32 @@ // check and deliver if header says body length is zero if (currentMessage.allContentReceived()) { - try - { - currentMessage.deliverToQueues(); - } - catch (AMQException e) - { - fail(e.getMessage()); - } + // TODO Deliver to queues + Transaction trans = new AutoCommitTransaction(_virtualHost.getMessageStore()); + final List<AMQQueue> destinationQueues = currentMessage.getDestinationQueues(); + trans.enqueue(currentMessage.getDestinationQueues(), currentMessage, new Transaction.Action() { + public void postCommit() + { + try + { + AMQMessage message = new AMQMessage(currentMessage.getMessageHandle(), currentMessage.getContentHeader(), currentMessage.getSize() ,currentMessage.getMessagePublishInfo()); + + for(AMQQueue queue : destinationQueues) + { + QueueEntry entry = queue.enqueue(message); + } + } + catch (AMQException e) + { + e.printStackTrace(); + } + } + + public void onRollback() + { + //To change body of implemented methods use File | Settings | File Templates. + } + }); } } Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java?rev=824084&r1=824083&r2=824084&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java Sun Oct 11 15:10:43 2009 @@ -55,7 +55,7 @@ { } - public void removeMessage(StoreContext s, Long messageId) + public void removeMessage(Long messageId) { } @@ -100,6 +100,23 @@ { } + public StoreFuture commitTranAsync(StoreContext context) throws AMQException + { + commitTran(context); + return new StoreFuture() + { + public boolean isComplete() + { + return true; + } + + public void waitForCompletion() + { + + } + }; + } + public void abortTran(StoreContext storeContext) throws AMQException { } @@ -114,22 +131,26 @@ return _messageId.getAndIncrement(); } - public void storeContentBodyChunk(StoreContext sc, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException + public void storeContentBodyChunk( + Long messageId, + int index, + ContentChunk contentBody, + boolean lastContentBody) throws AMQException { } - public void storeMessageMetaData(StoreContext sc, Long messageId, MessageMetaData messageMetaData) throws AMQException + public void storeMessageMetaData(Long messageId, MessageMetaData messageMetaData) throws AMQException { } - public MessageMetaData getMessageMetaData(StoreContext s,Long messageId) throws AMQException + public MessageMetaData getMessageMetaData(Long messageId) throws AMQException { return null; } - public ContentChunk getContentBodyChunk(StoreContext s,Long messageId, int index) throws AMQException + public ContentChunk getContentBodyChunk(Long messageId, int index) throws AMQException { return null; } Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java?rev=824084&r1=824083&r2=824084&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java Sun Oct 11 15:10:43 2009 @@ -29,6 +29,7 @@ import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.MessageHandleFactory; import org.apache.qpid.server.queue.AMQMessageHandle; +import org.apache.qpid.server.queue.MessageMetaData; /** * Tests that reference counting works correctly with AMQMessage and the message store @@ -37,14 +38,12 @@ { private TestMemoryMessageStore _store; - private StoreContext _storeContext = new StoreContext(); - protected void setUp() throws Exception { super.setUp(); _store = new TestMemoryMessageStore(); - StoreContext.setCurrentContext(_storeContext); + } /** @@ -86,9 +85,13 @@ final long messageId = _store.getNewMessageId(); AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true); - messageHandle.setPublishAndContentHeaderBody(_storeContext,info, chb); + + MessageMetaData mmd = messageHandle.setPublishAndContentHeaderBody(info, chb); + _store.storeMessageMetaData(messageId, mmd); + + AMQMessage message = new AMQMessage(messageHandle, - _storeContext,info); + chb, chb.bodySize,info); message = message.takeReference(); @@ -145,9 +148,12 @@ final Long messageId = _store.getNewMessageId(); final ContentHeaderBody chb = createPersistentContentHeader(); AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true); - messageHandle.setPublishAndContentHeaderBody(_storeContext,info,chb); + + MessageMetaData mmd = messageHandle.setPublishAndContentHeaderBody(info, chb); + _store.storeMessageMetaData(messageId, mmd); + AMQMessage message = new AMQMessage(messageHandle, - _storeContext, + chb, chb.bodySize, info); Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=824084&r1=824083&r2=824084&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java Sun Oct 11 15:10:43 2009 @@ -47,6 +47,10 @@ private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>(); private final Lock _stateChangeLock = new ReentrantLock(); + private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); + private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this); + + private static final AtomicLong idGenerator = new AtomicLong(0); // Create a simple ID that increments for ever new Subscription private final long _subscriptionID = idGenerator.getAndIncrement(); @@ -88,7 +92,12 @@ public SubscriptionAcquiredState getOwningState() { - return new QueueEntry.SubscriptionAcquiredState(this); + return _owningState; + } + + public QueueEntry.SubscriptionAssignedState getAssignedState() + { + return _assignedState; } public LogActor getLogActor() Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java?rev=824084&r1=824083&r2=824084&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java Sun Oct 11 15:10:43 2009 @@ -41,12 +41,10 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.util.MockChannel; -import java.security.Principal; public class InternalBrokerBaseCase extends TestCase { @@ -55,7 +53,6 @@ protected MockChannel _channel; protected InternalTestProtocolSession _session; protected VirtualHost _virtualHost; - protected StoreContext _storeContext = new StoreContext(); protected AMQQueue _queue; protected AMQShortString QUEUE_NAME; Modified: qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/client/SimpleConnectionTest.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/client/SimpleConnectionTest.java?rev=824084&r1=824083&r2=824084&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/client/SimpleConnectionTest.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/client/SimpleConnectionTest.java Sun Oct 11 15:10:43 2009 @@ -28,21 +28,43 @@ public class SimpleConnectionTest extends TestCase { - public void testConnection() +/* public void testConnection() { try { AMQConnection conn = new AMQConnection("127.0.0.1", 5673, "guest", "guest", "test", "/test"); + + QueueSession s = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); QueueSender p = s.createSender(new AMQQueue("amq.direct", "queue")); - p.send(s.createTextMessage("test")); + for(int i = 0; i < 6000; i++) + { + p.send(s.createTextMessage("test("+i+")")); + } QueueReceiver r = s.createReceiver(new AMQQueue("amq.direct", "queue")); conn.start(); - Message m = r.receive(); - - Thread.sleep(60000L); + Thread.sleep(1000L); + for(int i = 0; i < 3000; i++) + { + Message m = r.receive(); + } conn.close(); + + conn = new AMQConnection("127.0.0.1", 5673, "guest", "guest", "test", "/test"); + s = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + r = s.createReceiver(new AMQQueue("amq.direct", "queue")); + conn.start(); + Message m; + int rcvCnt = 0; + while((m = r.receive(1000))!= null) + { + rcvCnt++; + } + System.out.print(rcvCnt); + + Thread.sleep(60000l); + } catch (AMQException e) { @@ -61,4 +83,68 @@ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } } -} + + + public void testConnection2() + { + try + { + AMQConnection conn = new AMQConnection("127.0.0.1", 5673, "guest", "guest", "test", "/test"); + AMQConnection conn2 = new AMQConnection("127.0.0.1", 5673, "guest", "guest", "test", "/test"); + + AMQQueue amqQueue = new AMQQueue("amq.direct", "queue"); + + QueueSession s = conn.createQueueSession(true, Session.SESSION_TRANSACTED); + QueueSender p = s.createSender(amqQueue); + + QueueSession s2 = conn2.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + QueueReceiver r2 = s2.createReceiver(amqQueue); + r2.setMessageListener(new MessageListener() + { + + public void onMessage(Message message) + { + try + { + System.out.println("***************************************************************************"); + System.out.println("***************************************************************************"); + System.out.println("** " +((TextMessage)message).getText()); + System.out.println("***************************************************************************"); + System.out.println("***************************************************************************"); + } + catch (JMSException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + }); + conn2.start(); + + for(int i = 0; i < 6000; i++) + { + p.send(s.createTextMessage("test("+i+")")); + if(i%10 == 0) + { Thread.sleep(5000); + s.commit(); + + } + } + } + catch (JMSException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + catch (URLSyntaxException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + catch (AMQException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + catch (InterruptedException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } +*/} Modified: qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java?rev=824084&r1=824083&r2=824084&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java Sun Oct 11 15:10:43 2009 @@ -215,7 +215,7 @@ List<String> results = _monitor.findMatches(BND_PREFIX); // We will have two binds as we bind all queues to the default exchange - assertEquals("Result set larger than expected.", 4, results.size()); + assertEquals("Result not as expected." + results, 4, results.size()); String messageID = "BND-1001"; @@ -241,7 +241,7 @@ String subject = fromSubject(log); - assertTrue("Routing Key does not start with TempQueue:"+AbstractTestLogSubject.getSlice("rk", subject), + assertTrue("Routing Key does not start with TempQueue:"+AbstractTestLogSubject.getSlice("rk", subject), AbstractTestLogSubject.getSlice("rk", subject).startsWith("TempQueue")); assertEquals("Virtualhost not correct.", "/test", AbstractTestLogSubject.getSlice("vh", subject)); Modified: qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?rev=824084&r1=824083&r2=824084&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Sun Oct 11 15:10:43 2009 @@ -183,6 +183,11 @@ return null; //To change body of implemented methods use File | Settings | File Templates. } + public QueueEntry.SubscriptionAssignedState getAssignedState() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + public void queueDeleted(AMQQueue queue) { } Modified: qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java?rev=824084&r1=824083&r2=824084&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java Sun Oct 11 15:10:43 2009 @@ -155,10 +155,10 @@ doPostDelay("close"); } - public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException + public void removeMessage(Long messageId) throws AMQException { doPreDelay("removeMessage"); - _realStore.removeMessage(storeContext, messageId); + _realStore.removeMessage(messageId); doPostDelay("removeMessage"); } @@ -237,6 +237,24 @@ doPostDelay("commitTran"); } + public StoreFuture commitTranAsync(StoreContext context) throws AMQException + { + commitTran(context); + return new StoreFuture() + { + public boolean isComplete() + { + return true; + } + + public void waitForCompletion() + { + + } + }; + + } + public void abortTran(StoreContext context) throws AMQException { doPreDelay("abortTran"); @@ -260,32 +278,36 @@ return l; } - public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException + public void storeContentBodyChunk( + Long messageId, + int index, + ContentChunk contentBody, + boolean lastContentBody) throws AMQException { doPreDelay("storeContentBodyChunk"); - _realStore.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody); + _realStore.storeContentBodyChunk(messageId, index, contentBody, lastContentBody); doPostDelay("storeContentBodyChunk"); } - public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException + public void storeMessageMetaData(Long messageId, MessageMetaData messageMetaData) throws AMQException { doPreDelay("storeMessageMetaData"); - _realStore.storeMessageMetaData(context, messageId, messageMetaData); + _realStore.storeMessageMetaData(messageId, messageMetaData); doPostDelay("storeMessageMetaData"); } - public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException + public MessageMetaData getMessageMetaData(Long messageId) throws AMQException { doPreDelay("getMessageMetaData"); - MessageMetaData mmd = _realStore.getMessageMetaData(context, messageId); + MessageMetaData mmd = _realStore.getMessageMetaData(messageId); doPostDelay("getMessageMetaData"); return mmd; } - public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException + public ContentChunk getContentBodyChunk(Long messageId, int index) throws AMQException { doPreDelay("getContentBodyChunk"); - ContentChunk c = _realStore.getContentBodyChunk(context, messageId, index); + ContentChunk c = _realStore.getContentBodyChunk(messageId, index); doPostDelay("getContentBodyChunk"); return c; } Modified: qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java?rev=824084&r1=824083&r2=824084&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java Sun Oct 11 15:10:43 2009 @@ -311,11 +311,13 @@ AMQTopic topic = new AMQTopic(con, "testNoLocal"); - TopicSession session1 = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE); + TopicSession session1 = con.createTopicSession(true, AMQSession.AUTO_ACKNOWLEDGE); TopicSubscriber noLocal = session1.createSubscriber(topic, "", true); + TopicSubscriber select = session1.createSubscriber(topic, "Selector = 'select'", false); TopicSubscriber normal = session1.createSubscriber(topic); + TopicPublisher publisher = session1.createPublisher(topic); con.start(); @@ -329,12 +331,12 @@ m = (TextMessage) normal.receive(1000); assertNotNull(m); session1.commit(); - + //test selector subscriber doesn't message m = (TextMessage) select.receive(1000); assertNull(m); session1.commit(); - + //test nolocal subscriber doesn't message m = (TextMessage) noLocal.receive(1000); if (m != null) @@ -349,12 +351,12 @@ publisher.publish(message); session1.commit(); - + //test normal subscriber gets message m = (TextMessage) normal.receive(1000); assertNotNull(m); session1.commit(); - + //test selector subscriber does get message m = (TextMessage) select.receive(1000); assertNotNull(m); @@ -365,7 +367,7 @@ assertNull(m); AMQConnection con2 = (AMQConnection) getConnection("guest", "guest", "foo"); - TopicSession session2 = con2.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE); + TopicSession session2 = con2.createTopicSession(true, AMQSession.AUTO_ACKNOWLEDGE); TopicPublisher publisher2 = session2.createPublisher(topic); @@ -386,18 +388,18 @@ session1.commit(); //test nolocal subscriber does message - m = (TextMessage) noLocal.receive(100); + m = (TextMessage) noLocal.receive(1000); assertNotNull(m); con.close(); con2.close(); } - + /** * This tests QPID-1191, where messages which are sent to a topic but are not consumed by a subscriber * due to a selector can be leaked. - * @throws Exception + * @throws Exception */ public void testNonMatchingMessagesDoNotFillQueue() throws Exception { @@ -420,27 +422,27 @@ message = session.createTextMessage("non-matching 1"); publisher.publish(message); session.commit(); - + // Send and consume matching message message = session.createTextMessage("hello"); message.setStringProperty("Selector", "select"); publisher.publish(message); session.commit(); - + m = (TextMessage) selector.receive(1000); assertNotNull("should have received message", m); assertEquals("Message contents were wrong", "hello", m.getText()); - + // Send non-matching message message = session.createTextMessage("non-matching 2"); publisher.publish(message); session.commit(); - + // Assert queue count is 0 long depth = ((AMQTopicSessionAdaptor) session).getSession().getQueueDepth(topic); assertEquals("Queue depth was wrong", 0, depth); - + } public static junit.framework.Test suite() --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:[email protected]
