Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java Mon Jul 20 19:05:05 2009 @@ -25,6 +25,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.ack.TxAck; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -93,10 +94,11 @@ public void process() throws AMQException { - _message.incrementReference(); + MessageReference ref = _message.newReference(); try { - QueueEntry entry = _queue.enqueue(getStoreContext(),_message); + StoreContext.setCurrentContext(getStoreContext()); + QueueEntry entry = _queue.enqueue(_message); if(entry.immediateAndNotDelivered()) { @@ -105,7 +107,8 @@ } finally { - _message.decrementReference(getStoreContext()); + ref.release(); + StoreContext.clearCurrentContext(); } } }
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Mon Jul 20 19:05:05 2009 @@ -90,8 +90,13 @@ public void deliver(final AMQQueue queue, AMQMessage message) throws AMQException { - QueueEntry entry = queue.enqueue(_storeContext, message); - + StoreContext.setCurrentContext(getStoreContext()); + + QueueEntry entry = queue.enqueue(message); + + StoreContext.clearCurrentContext(); + + //following check implements the functionality //required by the 'immediate' flag: if(entry.immediateAndNotDelivered()) @@ -128,7 +133,7 @@ { if (debug) { - _log.debug("Discarding message: " + message.getMessage().getMessageId()); + _log.debug("Discarding message: " + message.getMessage().getMessageNumber()); } if(message.getMessage().isPersistent()) { @@ -171,7 +176,7 @@ if (debug) { - _log.debug("Discarding message: " + msg.getMessage().getMessageId()); + _log.debug("Discarding message: " + msg.getMessage().getMessageNumber()); } if(msg.getMessage().isPersistent()) { @@ -187,7 +192,7 @@ if (debug) { _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " + - msg.getMessage().getMessageId()); + msg.getMessage().getMessageNumber()); } } if(_inTran) Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java Mon Jul 20 19:05:05 2009 @@ -26,6 +26,7 @@ import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.QueueEntryImpl; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.tools.messagestore.MessageStoreTool; import org.apache.qpid.tools.utils.Console; @@ -100,7 +101,7 @@ for (QueueEntry entry : messages) { - AMQMessage msg = entry.getMessage(); + ServerMessage msg = entry.getMessage(); if (!includeMsg(msg, msgids)) { continue; @@ -112,7 +113,7 @@ // Show general message information hex.add(Show.Columns.ID.name()); - ascii.add(msg.getMessageId().toString()); + ascii.add(msg.getMessageNumber().toString()); hex.add(Console.ROW_DIVIDER); ascii.add(Console.ROW_DIVIDER); @@ -136,110 +137,114 @@ hex.add(Console.ROW_DIVIDER); ascii.add(Console.ROW_DIVIDER); - Iterator bodies = msg.getContentBodyIterator(); - if (bodies.hasNext()) + if(msg instanceof AMQMessage) { - hex.add("Hex"); - hex.add(Console.ROW_DIVIDER); + Iterator bodies = ((AMQMessage)msg).getContentBodyIterator(); + if (bodies.hasNext()) + { + hex.add("Hex"); + hex.add(Console.ROW_DIVIDER); - ascii.add("ASCII"); - ascii.add(Console.ROW_DIVIDER); - while (bodies.hasNext()) - { - ContentChunk chunk = (ContentChunk) bodies.next(); + ascii.add("ASCII"); + ascii.add(Console.ROW_DIVIDER); - //Duplicate so we don't destroy original data :) - ByteBuffer hexBuffer = chunk.getData().duplicate(); + while (bodies.hasNext()) + { + ContentChunk chunk = (ContentChunk) bodies.next(); - ByteBuffer charBuffer = hexBuffer.duplicate(); + //Duplicate so we don't destroy original data :) + ByteBuffer hexBuffer = chunk.getData().duplicate(); - Hex hexencoder = new Hex(); + ByteBuffer charBuffer = hexBuffer.duplicate(); - while (hexBuffer.hasRemaining()) - { - byte[] line = new byte[LINE_SIZE]; + Hex hexencoder = new Hex(); - int bufsize = hexBuffer.remaining(); - if (bufsize < LINE_SIZE) - { - hexBuffer.get(line, 0, bufsize); - } - else + while (hexBuffer.hasRemaining()) { - bufsize = line.length; - hexBuffer.get(line); - } + byte[] line = new byte[LINE_SIZE]; - byte[] encoded = hexencoder.encode(line); + int bufsize = hexBuffer.remaining(); + if (bufsize < LINE_SIZE) + { + hexBuffer.get(line, 0, bufsize); + } + else + { + bufsize = line.length; + hexBuffer.get(line); + } - try - { - String encStr = new String(encoded, 0, bufsize * 2, DEFAULT_ENCODING); - String hexLine = ""; + byte[] encoded = hexencoder.encode(line); - int strKength = encStr.length(); - for (int c = 0; c < strKength; c++) + try { - hexLine += encStr.charAt(c); + String encStr = new String(encoded, 0, bufsize * 2, DEFAULT_ENCODING); + String hexLine = ""; - if (c % 2 == 1 && SPACE_BYTES) + int strKength = encStr.length(); + for (int c = 0; c < strKength; c++) { - hexLine += BYTE_SPACER; + hexLine += encStr.charAt(c); + + if (c % 2 == 1 && SPACE_BYTES) + { + hexLine += BYTE_SPACER; + } } - } - hex.add(hexLine); - } - catch (UnsupportedEncodingException e) - { - _console.println(e.getMessage()); - return null; + hex.add(hexLine); + } + catch (UnsupportedEncodingException e) + { + _console.println(e.getMessage()); + return null; + } } - } - while (charBuffer.hasRemaining()) - { - String asciiLine = ""; - - for (int pos = 0; pos < LINE_SIZE; pos++) + while (charBuffer.hasRemaining()) { - if (charBuffer.hasRemaining()) - { - byte ch = charBuffer.get(); + String asciiLine = ""; - if (isPrintable(ch)) + for (int pos = 0; pos < LINE_SIZE; pos++) + { + if (charBuffer.hasRemaining()) { - asciiLine += (char) ch; + byte ch = charBuffer.get(); + + if (isPrintable(ch)) + { + asciiLine += (char) ch; + } + else + { + asciiLine += NON_PRINTING_ASCII_CHAR; + } + + if (SPACE_BYTES) + { + asciiLine += BYTE_SPACER; + } } else { - asciiLine += NON_PRINTING_ASCII_CHAR; - } - - if (SPACE_BYTES) - { - asciiLine += BYTE_SPACER; + break; } } - else - { - break; - } - } - ascii.add(asciiLine); + ascii.add(asciiLine); + } } } - } - else - { - List<String> result = new LinkedList<String>(); + else + { + List<String> result = new LinkedList<String>(); - display.add(result); - result.add("No ContentBodies"); + display.add(result); + result.add("No ContentBodies"); + } } } @@ -252,7 +257,7 @@ return display; } - private void addShowInformation(List<String> column1, List<String> column2, AMQMessage msg, + private void addShowInformation(List<String> column1, List<String> column2, ServerMessage msg, String title, boolean routing, boolean headers, boolean messageHeaders) { List<QueueEntry> single = new LinkedList<QueueEntry>(); Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java Mon Jul 20 19:05:05 2009 @@ -172,7 +172,7 @@ { for (QueueEntry msg : messages) { - ids.add(msg.getMessage().getMessageId()); + ids.add(msg.getMessage().getMessageNumber()); } } } Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java Mon Jul 20 19:05:05 2009 @@ -26,9 +26,9 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.QueueEntryImpl; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.tools.messagestore.MessageStoreTool; import org.apache.qpid.tools.utils.Console; @@ -337,30 +337,24 @@ //Add create the table of data for (QueueEntry entry : messages) { - AMQMessage msg = entry.getMessage(); + ServerMessage msg = entry.getMessage(); if (!includeMsg(msg, msgids)) { continue; } - id.add(msg.getMessageId().toString()); + id.add(msg.getMessageNumber().toString()); size.add("" + msg.getSize()); arrival.add("" + msg.getArrivalTime()); - try - { - ispersitent.add(msg.isPersistent() ? "true" : "false"); - } - catch (AMQException e) - { - ispersitent.add("n/a"); - } + ispersitent.add(msg.isPersistent() ? "true" : "false"); + isredelivered.add(msg.isRedelivered() ? "true" : "false"); - isdelivered.add(msg.getDeliveredToConsumer() ? "true" : "false"); + isdelivered.add(entry.getDeliveredToConsumer() ? "true" : "false"); // msg.getMessageHandle(); @@ -368,7 +362,10 @@ try { - headers = ((BasicContentHeaderProperties) msg.getContentHeaderBody().properties); + if(msg instanceof AMQMessage) + { + headers = ((BasicContentHeaderProperties) ((AMQMessage)msg).getContentHeaderBody().properties); + } } catch (AMQException e) { @@ -417,7 +414,11 @@ MessagePublishInfo info = null; try { - info = msg.getMessagePublishInfo(); + if(msg instanceof AMQMessage) + { + info = ((AMQMessage)msg).getMessagePublishInfo(); + } + } catch (AMQException e) { @@ -457,14 +458,14 @@ return data; } - protected boolean includeMsg(AMQMessage msg, List<Long> msgids) + protected boolean includeMsg(ServerMessage msg, List<Long> msgids) { if (msgids == null) { return true; } - Long msgid = msg.getMessageId(); + Long msgid = msg.getMessageNumber(); boolean found = false; Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java Mon Jul 20 19:05:05 2009 @@ -89,7 +89,7 @@ while(queueEntries.advance()) { QueueEntry entry = queueEntries.getNode(); - _unacknowledgedMessageMap.add(entry.getMessage().getMessageId(), entry); + _unacknowledgedMessageMap.add(entry.getMessage().getMessageNumber(), entry); // Store the entry for future inspection _referenceList.add(entry); Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java Mon Jul 20 19:05:05 2009 @@ -38,7 +38,6 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; @@ -163,7 +162,9 @@ }; TestMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext()); - _map.add(deliveryTag, _queue.enqueue(new StoreContext(), message)); + StoreContext sc = StoreContext.setCurrentContext(new StoreContext()); + _map.add(deliveryTag, _queue.enqueue(message)); + StoreContext.setCurrentContext(sc); } _acked = acked; _unacked = unacked; Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Mon Jul 20 19:05:05 2009 @@ -33,6 +33,8 @@ import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.log4j.Logger; @@ -255,9 +257,9 @@ * @throws AMQException */ @Override - public QueueEntry enqueue(StoreContext context, AMQMessage msg) throws AMQException + public QueueEntry enqueue(ServerMessage msg) throws AMQException { - messages.add( new HeadersExchangeTest.Message(msg)); + messages.add( new HeadersExchangeTest.Message((AMQMessage) msg)); return new QueueEntry() { @@ -326,11 +328,6 @@ //To change body of implemented methods use File | Settings | File Templates. } - public String debugIdentity() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - public boolean immediateAndNotDelivered() { return false; //To change body of implemented methods use File | Settings | File Templates. @@ -438,7 +435,7 @@ } - public ContentHeaderBody getContentHeaderBody() + public ContentHeaderBody getContentHeader() { try { @@ -522,7 +519,7 @@ void route(Exchange exchange) throws AMQException { - exchange.route(_incoming); + _incoming.enqueue(exchange.route(_incoming)); } Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java Mon Jul 20 19:05:05 2009 @@ -76,7 +76,7 @@ IncomingMessage message = new IncomingMessage(0L, info, null, _protocolSession); - _exchange.route(message); + message.enqueue(_exchange.route(message)); Assert.assertEquals(0, queue.getMessageCount()); } @@ -100,7 +100,7 @@ Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -140,7 +140,7 @@ Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -159,7 +159,7 @@ Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -198,7 +198,7 @@ Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -217,7 +217,7 @@ Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -236,7 +236,7 @@ Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -254,7 +254,7 @@ Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -294,7 +294,7 @@ Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -312,7 +312,7 @@ Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -352,7 +352,7 @@ Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -384,7 +384,7 @@ Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -425,7 +425,7 @@ Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -464,7 +464,7 @@ Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -495,7 +495,7 @@ private void routeMessage(final IncomingMessage message) throws AMQException { - _exchange.route(message); + message.enqueue(_exchange.route(message)); message.routingComplete(_store, new MessageHandleFactory()); message.deliverToQueues(); } Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java Mon Jul 20 19:05:05 2009 @@ -22,16 +22,85 @@ import java.util.Map; import java.util.HashMap; +import java.util.Set; import junit.framework.TestCase; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.message.AMQMessageHeader; /** */ public class HeadersBindingTest extends TestCase { + + private class MockHeader implements AMQMessageHeader + { + + private final Map<String, Object> _headers = new HashMap<String, Object>(); + + public String getCorrelationId() + { + return null; + } + + public long getExpiration() + { + return 0; + } + + public String getMessageId() + { + return null; + } + + public byte getPriority() + { + return 0; + } + + public long getTimestamp() + { + return 0; + } + + public String getType() + { + return null; + } + + public String getReplyTo() + { + return null; + } + + public Object getHeader(String name) + { + return _headers.get(name); + } + + public boolean containsHeaders(Set<String> names) + { + return _headers.keySet().containsAll(names); + } + + public boolean containsHeader(String name) + { + return _headers.containsKey(name); + } + + public void setString(String key, String value) + { + setObject(key,value); + } + + public void setObject(String key, Object value) + { + _headers.put(key,value); + } + } + private FieldTable bindHeaders = new FieldTable(); - private FieldTable matchHeaders = new FieldTable(); + private MockHeader matchHeaders = new MockHeader(); public void testDefault_1() { Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java Mon Jul 20 19:05:05 2009 @@ -42,19 +42,19 @@ { // Enqueue messages in order - _queue.enqueue(null, createMessage(1L, (byte) 10)); - _queue.enqueue(null, createMessage(2L, (byte) 4)); - _queue.enqueue(null, createMessage(3L, (byte) 0)); + _queue.enqueue(createMessage(1L, (byte) 10)); + _queue.enqueue(createMessage(2L, (byte) 4)); + _queue.enqueue(createMessage(3L, (byte) 0)); // Enqueue messages in reverse order - _queue.enqueue(null, createMessage(4L, (byte) 0)); - _queue.enqueue(null, createMessage(5L, (byte) 4)); - _queue.enqueue(null, createMessage(6L, (byte) 10)); + _queue.enqueue(createMessage(4L, (byte) 0)); + _queue.enqueue(createMessage(5L, (byte) 4)); + _queue.enqueue(createMessage(6L, (byte) 10)); // Enqueue messages out of order - _queue.enqueue(null, createMessage(7L, (byte) 4)); - _queue.enqueue(null, createMessage(8L, (byte) 10)); - _queue.enqueue(null, createMessage(9L, (byte) 0)); + _queue.enqueue(createMessage(7L, (byte) 4)); + _queue.enqueue(createMessage(8L, (byte) 10)); + _queue.enqueue(createMessage(9L, (byte) 0)); // Register subscriber _queue.registerSubscription(_subscription, false); @@ -63,17 +63,17 @@ ArrayList<QueueEntry> msgs = _subscription.getMessages(); try { - assertEquals(new Long(1L), msgs.get(0).getMessage().getMessageId()); - assertEquals(new Long(6L), msgs.get(1).getMessage().getMessageId()); - assertEquals(new Long(8L), msgs.get(2).getMessage().getMessageId()); - - assertEquals(new Long(2L), msgs.get(3).getMessage().getMessageId()); - assertEquals(new Long(5L), msgs.get(4).getMessage().getMessageId()); - assertEquals(new Long(7L), msgs.get(5).getMessage().getMessageId()); - - assertEquals(new Long(3L), msgs.get(6).getMessage().getMessageId()); - assertEquals(new Long(4L), msgs.get(7).getMessage().getMessageId()); - assertEquals(new Long(9L), msgs.get(8).getMessage().getMessageId()); + assertEquals(new Long(1L), msgs.get(0).getMessage().getMessageNumber()); + assertEquals(new Long(6L), msgs.get(1).getMessage().getMessageNumber()); + assertEquals(new Long(8L), msgs.get(2).getMessage().getMessageNumber()); + + assertEquals(new Long(2L), msgs.get(3).getMessage().getMessageNumber()); + assertEquals(new Long(5L), msgs.get(4).getMessage().getMessageNumber()); + assertEquals(new Long(7L), msgs.get(5).getMessage().getMessageNumber()); + + assertEquals(new Long(3L), msgs.get(6).getMessage().getMessageNumber()); + assertEquals(new Long(4L), msgs.get(7).getMessage().getMessageNumber()); + assertEquals(new Long(9L), msgs.get(8).getMessage().getMessageNumber()); } catch (AssertionFailedError afe) { @@ -81,7 +81,7 @@ int index = 1; for (QueueEntry qe : msgs) { - System.err.println(index + ":" + qe.getMessage().getMessageId()); + System.err.println(index + ":" + qe.getMessage().getMessageNumber()); index++; } Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Mon Jul 20 19:05:05 2009 @@ -31,6 +31,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.AMQException; import org.apache.commons.configuration.Configuration; @@ -160,7 +161,7 @@ return 0; //To change body of implemented methods use File | Settings | File Templates. } - public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException + public QueueEntry enqueue(ServerMessage message) throws AMQException { return null; //To change body of implemented methods use File | Settings | File Templates. } @@ -280,7 +281,7 @@ return 0; //To change body of implemented methods use File | Settings | File Templates. } - @Override + public void checkMessageStatus() throws AMQException { //To change body of implemented methods use File | Settings | File Templates. @@ -321,7 +322,7 @@ return 0; //To change body of implemented methods use File | Settings | File Templates. } - @Override + public void setMinimumAlertRepeatGap(long value) { Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java Mon Jul 20 19:05:05 2009 @@ -49,11 +49,6 @@ } - public String debugIdentity() - { - return null; - } - public boolean delete() { return false; Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Mon Jul 20 19:05:05 2009 @@ -185,7 +185,7 @@ // Check sending a message ends up with the subscriber AMQMessage messageA = createMessage(new Long(24)); - _queue.enqueue(null, messageA); + _queue.enqueue(messageA); assertEquals(messageA, _subscription.getLastSeenEntry().getMessage()); // Check removing the subscription removes it's information from the queue @@ -196,7 +196,7 @@ 1 == _queue.getActiveConsumerCount()); AMQMessage messageB = createMessage(new Long (25)); - _queue.enqueue(null, messageB); + _queue.enqueue(messageB); QueueEntry entry = _subscription.getLastSeenEntry(); assertNull(entry); } @@ -204,7 +204,7 @@ public void testQueueNoSubscriber() throws AMQException, InterruptedException { AMQMessage messageA = createMessage(new Long(24)); - _queue.enqueue(null, messageA); + _queue.enqueue(messageA); _queue.registerSubscription(_subscription, false); Thread.sleep(150); assertEquals(messageA, _subscription.getLastSeenEntry().getMessage()); @@ -223,7 +223,7 @@ // Check sending a message ends up with the subscriber AMQMessage messageA = createMessage(new Long(24)); - _queue.enqueue(null, messageA); + _queue.enqueue(messageA); assertEquals(messageA, _subscription.getLastSeenEntry().getMessage()); // Check we cannot add a second subscriber to the queue @@ -261,7 +261,7 @@ _queue = new SimpleAMQQueue(_qname, false, _owner, true, _virtualHost); _queue.registerSubscription(_subscription, false); AMQMessage message = createMessage(new Long(25)); - _queue.enqueue(null, message); + _queue.enqueue(message); _queue.unregisterSubscription(_subscription); assertTrue("Queue was not deleted when subscription was removed", _queue.isDeleted()); @@ -272,7 +272,7 @@ _queue.registerSubscription(_subscription, false); Long id = new Long(26); AMQMessage message = createMessage(id); - _queue.enqueue(null, message); + _queue.enqueue(message); QueueEntry entry = _subscription.getLastSeenEntry(); entry.setRedelivered(true); _queue.resend(entry, _subscription); @@ -286,7 +286,7 @@ AMQMessage message = createMessage(messageId); // Put message on queue - _queue.enqueue(null, message); + _queue.enqueue(message); // Get message id Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0); @@ -302,7 +302,7 @@ Long messageId = new Long(i); AMQMessage message = createMessage(messageId); // Put message on queue - _queue.enqueue(null, message); + _queue.enqueue(message); } // Get message ids List<Long> msgids = _queue.getMessagesOnTheQueue(5); @@ -323,7 +323,7 @@ Long messageId = new Long(i); AMQMessage message = createMessage(messageId); // Put message on queue - _queue.enqueue(null, message); + _queue.enqueue(message); } // Get message ids List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5); Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java Mon Jul 20 19:05:05 2009 @@ -44,6 +44,7 @@ { super.setUp(); _store = new TestMemoryMessageStore(); + StoreContext.setCurrentContext(_storeContext); } /** @@ -96,7 +97,7 @@ assertEquals(1, _store.getMessageMetaDataMap().size()); - message.decrementReference(_storeContext); + message.decrementReference(); assertEquals(1, _store.getMessageMetaDataMap().size()); } @@ -158,7 +159,7 @@ assertEquals(1, _store.getMessageMetaDataMap().size()); message = message.takeReference(); - message.decrementReference(_storeContext); + message.decrementReference(); assertEquals(1, _store.getMessageMetaDataMap().size()); } Modified: qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java Mon Jul 20 19:05:05 2009 @@ -904,10 +904,13 @@ } } + public Object get(String key) + { + return get(new AMQShortString(key)); + } public Object get(AMQShortString key) { - return getObject(key); } Modified: qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Mon Jul 20 19:05:05 2009 @@ -55,7 +55,7 @@ private static final Logger log = Logger.get(Connection.class); - enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD } + public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD } class DefaultConnectionListener implements ConnectionListener { @@ -118,7 +118,7 @@ sender.setIdleTimeout(idleTimeout); } - void setState(State state) + protected void setState(State state) { synchronized (lock) { Modified: qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java Mon Jul 20 19:05:05 2009 @@ -52,13 +52,28 @@ { private SaslServer saslServer; + private List<Object> _locales; + private List<Object> _mechanisms; + private Map<String, Object> _clientProperties; + + + public ServerDelegate() + { + this(null, Collections.EMPTY_LIST, Collections.singletonList((Object)"utf8")); + } + + protected ServerDelegate(Map<String, Object> clientProperties, List<Object> mechanisms, List<Object> locales) + { + _clientProperties = clientProperties; + _mechanisms = mechanisms; + _locales = locales; + } public void init(Connection conn, ProtocolHeader hdr) { conn.send(new ProtocolHeader(1, 0, 10)); - List<Object> utf8 = new ArrayList<Object>(); - utf8.add("utf8"); - conn.connectionStart(null, Collections.EMPTY_LIST, utf8); + + conn.connectionStart(_clientProperties, _mechanisms, _locales); } @Override public void connectionStartOk(Connection conn, ConnectionStartOk ok) @@ -77,8 +92,8 @@ try { - SaslServer ss = Sasl.createSaslServer - (mechanism, "AMQP", "localhost", null, null); + + SaslServer ss = createSaslServer(mechanism); if (ss == null) { conn.connectionClose @@ -95,6 +110,14 @@ } } + protected SaslServer createSaslServer(String mechanism) + throws SaslException + { + SaslServer ss = Sasl.createSaslServer + (mechanism, "AMQP", "localhost", null, null); + return ss; + } + private void secure(Connection conn, byte[] response) { SaslServer ss = conn.getSaslServer(); @@ -133,9 +156,16 @@ @Override public void connectionOpen(Connection conn, ConnectionOpen open) { conn.connectionOpenOk(Collections.EMPTY_LIST); + conn.setState(OPEN); } + protected Session getSession(Connection conn, SessionDelegate delegate, SessionAttach atc) + { + return new Session(conn, delegate, new Binary(atc.getName()), 0); + } + + public Session getSession(Connection conn, SessionAttach atc) { return new Session(conn, new Binary(atc.getName()), 0); Modified: qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Mon Jul 20 19:05:05 2009 @@ -82,7 +82,7 @@ private Binary name; private long expiry; private int channel; - private SessionDelegate delegate = new SessionDelegate(); + private SessionDelegate delegate; private SessionListener listener = new DefaultSessionListener(); private long timeout = 60000; private boolean autoSync = false; @@ -112,9 +112,15 @@ private Thread resumer = null; - Session(Connection connection, Binary name, long expiry) + protected Session(Connection connection, Binary name, long expiry) + { + this(connection, new SessionDelegate(), name, expiry); + } + + protected Session(Connection connection, SessionDelegate delegate, Binary name, long expiry) { this.connection = connection; + this.delegate = delegate; this.name = name; this.expiry = expiry; initReceiver(); Added: qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/client/SimpleConnectionTest.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/client/SimpleConnectionTest.java?rev=795958&view=auto ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/client/SimpleConnectionTest.java (added) +++ qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/client/SimpleConnectionTest.java Mon Jul 20 19:05:05 2009 @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +import junit.framework.TestCase; +import org.apache.qpid.AMQException; +import org.apache.qpid.url.URLSyntaxException; + +import javax.jms.*; + +public class SimpleConnectionTest extends TestCase +{ + public void testConnection() + { + try + { + AMQConnection conn = new AMQConnection("127.0.0.1", 5673, "guest", "guest", "test", "/test"); + QueueSession s = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + QueueSender p = s.createSender(new AMQQueue("amq.direct", "queue")); + p.send(s.createTextMessage("test")); + + QueueReceiver r = s.createReceiver(new AMQQueue("amq.direct", "queue")); + conn.start(); + Message m = r.receive(); + + Thread.sleep(60000L); + conn.close(); + } + catch (AMQException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + catch (URLSyntaxException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + catch (InterruptedException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + catch (JMSException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } +} --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:[email protected]
