Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java?rev=1225178&r1=1225177&r2=1225178&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java Wed Dec 28 13:02:41 2011 @@ -29,12 +29,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import org.apache.qpid.transport.Binary; -import org.apache.qpid.transport.RangeSet; -import org.apache.qpid.transport.Struct; -import org.apache.qpid.transport.Type; - -import static org.apache.qpid.transport.util.Functions.*; +import org.apache.qpid.transport.*; /** @@ -194,18 +189,19 @@ abstract class AbstractDecoder implement public RangeSet readSequenceSet() { int count = readUint16()/8; - if (count == 0) + switch(count) { - return null; - } - else - { - RangeSet ranges = new RangeSet(); - for (int i = 0; i < count; i++) - { - ranges.add(readSequenceNo(), readSequenceNo()); - } - return ranges; + case 0: + return null; + case 1: + return Range.newInstance(readSequenceNo(), readSequenceNo()); + default: + RangeSet ranges = RangeSetFactory.createRangeSet(count); + for (int i = 0; i < count; i++) + { + ranges.add(readSequenceNo(), readSequenceNo()); + } + return ranges; } }
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java?rev=1225178&r1=1225177&r2=1225178&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java Wed Dec 28 13:02:41 2011 @@ -70,6 +70,16 @@ public final class BBEncoder extends Abs return slice; } + public int position() + { + return out.position(); + } + + public ByteBuffer underlyingBuffer() + { + return out; + } + private void grow(int size) { ByteBuffer old = out; Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java?rev=1225178&r1=1225177&r2=1225178&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java Wed Dec 28 13:02:41 2011 @@ -26,13 +26,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.Method; -import org.apache.qpid.transport.ProtocolError; -import org.apache.qpid.transport.ProtocolEvent; -import org.apache.qpid.transport.ProtocolHeader; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.Struct; +import org.apache.qpid.transport.*; import org.apache.qpid.transport.codec.BBDecoder; /** @@ -198,12 +192,33 @@ public class Assembler implements Receiv break; case HEADER: command = getIncompleteCommand(channel); - List<Struct> structs = new ArrayList<Struct>(2); + List<Struct> structs = null; + DeliveryProperties deliveryProps = null; + MessageProperties messageProps = null; + while (dec.hasRemaining()) { - structs.add(dec.readStruct32()); + Struct struct = dec.readStruct32(); + if(struct instanceof DeliveryProperties && deliveryProps == null) + { + deliveryProps = (DeliveryProperties) struct; + } + else if(struct instanceof MessageProperties && messageProps == null) + { + messageProps = (MessageProperties) struct; + } + else + { + if(structs == null) + { + structs = new ArrayList<Struct>(2); + } + structs.add(struct); + } + } - command.setHeader(new Header(structs)); + command.setHeader(new Header(deliveryProps,messageProps,structs)); + if (frame.isLastSegment()) { setIncompleteCommand(channel, null); Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java?rev=1225178&r1=1225177&r2=1225178&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java Wed Dec 28 13:02:41 2011 @@ -87,27 +87,35 @@ public final class Disassembler implemen } } + private final ByteBuffer _frameHeader = ByteBuffer.allocate(HEADER_SIZE); + + { + _frameHeader.order(ByteOrder.BIG_ENDIAN); + } + private void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf) { synchronized (sendlock) { - ByteBuffer data = ByteBuffer.allocate(size + HEADER_SIZE); - data.order(ByteOrder.BIG_ENDIAN); + ByteBuffer data = _frameHeader; + _frameHeader.rewind(); + data.put(0, flags); data.put(1, type); data.putShort(2, (short) (size + HEADER_SIZE)); data.put(5, track); data.putShort(6, (short) channel); - data.position(HEADER_SIZE); + int limit = buf.limit(); buf.limit(buf.position() + size); - data.put(buf); - buf.limit(limit); - + data.rewind(); sender.send(data); + sender.send(buf); + buf.limit(limit); + } } @@ -179,7 +187,7 @@ public final class Disassembler implemen } } method.write(enc); - ByteBuffer methodSeg = enc.segment(); + int methodLimit = enc.position(); byte flags = FIRST_SEG; @@ -189,29 +197,44 @@ public final class Disassembler implemen flags |= LAST_SEG; } - ByteBuffer headerSeg = null; + int headerLimit = -1; if (payload) { final Header hdr = method.getHeader(); if (hdr != null) { - final Struct[] structs = hdr.getStructs(); - - for (Struct st : structs) + if(hdr.getDeliveryProperties() != null) + { + enc.writeStruct32(hdr.getDeliveryProperties()); + } + if(hdr.getMessageProperties() != null) + { + enc.writeStruct32(hdr.getMessageProperties()); + } + if(hdr.getNonStandardProperties() != null) { - enc.writeStruct32(st); + for (Struct st : hdr.getNonStandardProperties()) + { + enc.writeStruct32(st); + } } } - headerSeg = enc.segment(); + headerLimit = enc.position(); } synchronized (sendlock) { - fragment(flags, type, method, methodSeg); + ByteBuffer buf = enc.underlyingBuffer(); + buf.position(0); + buf.limit(methodLimit); + + fragment(flags, type, method, buf); if (payload) { ByteBuffer body = method.getBody(); - fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, headerSeg); + buf.limit(headerLimit); + buf.position(methodLimit); + fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, buf); if (body != null) { fragment(LAST_SEG, SegmentType.BODY, method, body); Propchange: qpid/trunk/qpid/java/common/src/test/ ------------------------------------------------------------------------------ --- svn:ignore (added) +++ svn:ignore Wed Dec 28 13:02:41 2011 @@ -0,0 +1,2 @@ +*.iml + Modified: qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java?rev=1225178&r1=1225177&r2=1225178&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java (original) +++ qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java Wed Dec 28 13:02:41 2011 @@ -581,10 +581,10 @@ public class PropertyFieldTableTest exte table.setBytes("bytes", bytes); table.setChar("char", 'c'); - table.setDouble("double", Double.MAX_VALUE); - table.setFloat("float", Float.MAX_VALUE); table.setInteger("int", Integer.MAX_VALUE); table.setLong("long", Long.MAX_VALUE); + table.setDouble("double", Double.MAX_VALUE); + table.setFloat("float", Float.MAX_VALUE); table.setShort("short", Short.MAX_VALUE); table.setString("string", "hello"); table.setString("null-string", null); @@ -823,9 +823,7 @@ public class PropertyFieldTableTest exte */ public void testCheckPropertyNamehasMaxLength() { - String oldVal = System.getProperty("STRICT_AMQP"); - System.setProperty("STRICT_AMQP", "true"); - FieldTable table = new FieldTable(); + FieldTable table = new FieldTable(true); StringBuffer longPropertyName = new StringBuffer(129); @@ -845,14 +843,6 @@ public class PropertyFieldTableTest exte } // so length should be zero Assert.assertEquals(0, table.getEncodedSize()); - if (oldVal != null) - { - System.setProperty("STRICT_AMQP", oldVal); - } - else - { - System.clearProperty("STRICT_AMQP"); - } } /** @@ -860,9 +850,7 @@ public class PropertyFieldTableTest exte */ public void testCheckPropertyNameStartCharacterIsLetter() { - String oldVal = System.getProperty("STRICT_AMQP"); - System.setProperty("STRICT_AMQP", "true"); - FieldTable table = new FieldTable(); + FieldTable table = new FieldTable(true); // Try a name that starts with a number try @@ -876,14 +864,6 @@ public class PropertyFieldTableTest exte } // so length should be zero Assert.assertEquals(0, table.getEncodedSize()); - if (oldVal != null) - { - System.setProperty("STRICT_AMQP", oldVal); - } - else - { - System.clearProperty("STRICT_AMQP"); - } } /** @@ -891,9 +871,7 @@ public class PropertyFieldTableTest exte */ public void testCheckPropertyNameStartCharacterIsHashorDollar() { - String oldVal = System.getProperty("STRICT_AMQP"); - System.setProperty("STRICT_AMQP", "true"); - FieldTable table = new FieldTable(); + FieldTable table = new FieldTable(true); // Try a name that starts with a number try @@ -906,14 +884,6 @@ public class PropertyFieldTableTest exte fail("property name are allowed to start with # and $s"); } - if (oldVal != null) - { - System.setProperty("STRICT_AMQP", oldVal); - } - else - { - System.clearProperty("STRICT_AMQP"); - } } /** Propchange: qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/test/ ------------------------------------------------------------------------------ --- svn:ignore (added) +++ svn:ignore Wed Dec 28 13:02:41 2011 @@ -0,0 +1,2 @@ +*.iml + Modified: qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/RangeSetTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/RangeSetTest.java?rev=1225178&r1=1225177&r2=1225178&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/RangeSetTest.java (original) +++ qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/RangeSetTest.java Wed Dec 28 13:02:41 2011 @@ -60,7 +60,7 @@ public class RangeSetTest extends TestCa public void test1() { - RangeSet ranges = new RangeSet(); + RangeSet ranges = RangeSetFactory.createRangeSet(); ranges.add(5, 10); check(ranges); ranges.add(15, 20); @@ -77,7 +77,7 @@ public class RangeSetTest extends TestCa public void test2() { - RangeSet rs = new RangeSet(); + RangeSet rs = RangeSetFactory.createRangeSet(); check(rs); rs.add(1); @@ -128,7 +128,7 @@ public class RangeSetTest extends TestCa public void testAddSelf() { - RangeSet a = new RangeSet(); + RangeSet a = RangeSetFactory.createRangeSet(); a.add(0, 8); check(a); a.add(0, 8); @@ -141,8 +141,8 @@ public class RangeSetTest extends TestCa public void testIntersect1() { - Range a = new Range(0, 10); - Range b = new Range(9, 20); + Range a = Range.newInstance(0, 10); + Range b = Range.newInstance(9, 20); Range i1 = a.intersect(b); Range i2 = b.intersect(a); assertEquals(i1.getUpper(), 10); @@ -153,16 +153,16 @@ public class RangeSetTest extends TestCa public void testIntersect2() { - Range a = new Range(0, 10); - Range b = new Range(11, 20); + Range a = Range.newInstance(0, 10); + Range b = Range.newInstance(11, 20); assertNull(a.intersect(b)); assertNull(b.intersect(a)); } public void testIntersect3() { - Range a = new Range(0, 10); - Range b = new Range(3, 5); + Range a = Range.newInstance(0, 10); + Range b = Range.newInstance(3, 5); Range i1 = a.intersect(b); Range i2 = b.intersect(a); assertEquals(i1.getUpper(), 5); @@ -173,14 +173,14 @@ public class RangeSetTest extends TestCa public void testSubtract1() { - Range a = new Range(0, 10); + Range a = Range.newInstance(0, 10); assertTrue(a.subtract(a).isEmpty()); } public void testSubtract2() { - Range a = new Range(0, 10); - Range b = new Range(20, 30); + Range a = Range.newInstance(0, 10); + Range b = Range.newInstance(20, 30); List<Range> ranges = a.subtract(b); assertEquals(ranges.size(), 1); Range d = ranges.get(0); @@ -190,8 +190,8 @@ public class RangeSetTest extends TestCa public void testSubtract3() { - Range a = new Range(20, 30); - Range b = new Range(0, 10); + Range a = Range.newInstance(20, 30); + Range b = Range.newInstance(0, 10); List<Range> ranges = a.subtract(b); assertEquals(ranges.size(), 1); Range d = ranges.get(0); @@ -201,8 +201,8 @@ public class RangeSetTest extends TestCa public void testSubtract4() { - Range a = new Range(0, 10); - Range b = new Range(3, 5); + Range a = Range.newInstance(0, 10); + Range b = Range.newInstance(3, 5); List<Range> ranges = a.subtract(b); assertEquals(ranges.size(), 2); Range low = ranges.get(0); @@ -215,8 +215,8 @@ public class RangeSetTest extends TestCa public void testSubtract5() { - Range a = new Range(0, 10); - Range b = new Range(3, 20); + Range a = Range.newInstance(0, 10); + Range b = Range.newInstance(3, 20); List<Range> ranges = a.subtract(b); assertEquals(ranges.size(), 1); Range d = ranges.get(0); @@ -226,8 +226,8 @@ public class RangeSetTest extends TestCa public void testSubtract6() { - Range a = new Range(0, 10); - Range b = new Range(-10, 5); + Range a = Range.newInstance(0, 10); + Range b = Range.newInstance(-10, 5); List<Range> ranges = a.subtract(b); assertEquals(ranges.size(), 1); Range d = ranges.get(0); Modified: qpid/trunk/qpid/java/common/templates/method/version/MethodBodyClass.vm URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/templates/method/version/MethodBodyClass.vm?rev=1225178&r1=1225177&r2=1225178&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/templates/method/version/MethodBodyClass.vm (original) +++ qpid/trunk/qpid/java/common/templates/method/version/MethodBodyClass.vm Wed Dec 28 13:02:41 2011 @@ -46,8 +46,9 @@ package org.apache.qpid.framing.amqp_$version.getMajor()_$version.getMinor(); -import java.io.DataInputStream; -import java.io.DataOutputStream; +import java.io.DataInput; +import org.apache.qpid.codec.MarkableDataInput; +import java.io.DataOutput; import java.io.IOException; import java.util.HashMap; @@ -58,7 +59,7 @@ public class ${javaClassName} extends AM { private static final AMQMethodBodyInstanceFactory FACTORY_INSTANCE = new AMQMethodBodyInstanceFactory() { - public AMQMethodBody newInstance(DataInputStream in, long size) throws AMQFrameDecodingException, IOException + public AMQMethodBody newInstance(MarkableDataInput in, long size) throws AMQFrameDecodingException, IOException { return new ${javaClassName}(in); } @@ -86,7 +87,7 @@ public class ${javaClassName} extends AM // Constructor - public ${javaClassName}(DataInputStream buffer) throws AMQFrameDecodingException, IOException + public ${javaClassName}(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException { #foreach( $field in $method.ConsolidatedFields ) _$field.Name = read$field.getEncodingType()( buffer ); @@ -171,7 +172,7 @@ public class ${javaClassName} extends AM return size; } - public void writeMethodPayload(DataOutputStream buffer) throws IOException + public void writeMethodPayload(DataOutput buffer) throws IOException { #foreach( $field in $method.ConsolidatedFields ) write$field.getEncodingType()( buffer, _$field.Name ); Modified: qpid/trunk/qpid/java/common/templates/model/MethodRegistryClass.vm URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/templates/model/MethodRegistryClass.vm?rev=1225178&r1=1225177&r2=1225178&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/templates/model/MethodRegistryClass.vm (original) +++ qpid/trunk/qpid/java/common/templates/model/MethodRegistryClass.vm Wed Dec 28 13:02:41 2011 @@ -30,10 +30,10 @@ package org.apache.qpid.framing; -import java.io.DataInputStream; import java.io.IOException; import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; +import org.apache.qpid.codec.MarkableDataInput; import java.util.Map; import java.util.HashMap; @@ -54,7 +54,7 @@ public abstract class MethodRegistry #end - public abstract AMQMethodBody convertToBody(DataInputStream in, long size) + public abstract AMQMethodBody convertToBody(MarkableDataInput in, long size) throws AMQFrameDecodingException, IOException; public abstract int getMaxClassId(); Modified: qpid/trunk/qpid/java/common/templates/model/version/MethodRegistryClass.vm URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/templates/model/version/MethodRegistryClass.vm?rev=1225178&r1=1225177&r2=1225178&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/templates/model/version/MethodRegistryClass.vm (original) +++ qpid/trunk/qpid/java/common/templates/model/version/MethodRegistryClass.vm Wed Dec 28 13:02:41 2011 @@ -35,10 +35,10 @@ import org.apache.qpid.protocol.AMQConst import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.DataInputStream; import java.io.IOException; import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; +import org.apache.qpid.codec.MarkableDataInput; public class MethodRegistry_$version.getMajor()_$version.getMinor() extends MethodRegistry @@ -87,7 +87,7 @@ public class MethodRegistry_$version.get } - public AMQMethodBody convertToBody(DataInputStream in, long size) + public AMQMethodBody convertToBody(MarkableDataInput in, long size) throws AMQFrameDecodingException, IOException { int classId = in.readUnsignedShort(); Propchange: qpid/trunk/qpid/java/integrationtests/src/main/ ------------------------------------------------------------------------------ --- svn:ignore (added) +++ svn:ignore Wed Dec 28 13:02:41 2011 @@ -0,0 +1,2 @@ +*.iml + Propchange: qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/test/ ------------------------------------------------------------------------------ --- svn:ignore (added) +++ svn:ignore Wed Dec 28 13:02:41 2011 @@ -0,0 +1,2 @@ +*.iml + Propchange: qpid/trunk/qpid/java/junit-toolkit/src/ ------------------------------------------------------------------------------ --- svn:ignore (added) +++ svn:ignore Wed Dec 28 13:02:41 2011 @@ -0,0 +1,2 @@ +*.iml + Propchange: qpid/trunk/qpid/java/junit-toolkit/src/main/ ------------------------------------------------------------------------------ --- svn:ignore (added) +++ svn:ignore Wed Dec 28 13:02:41 2011 @@ -0,0 +1,2 @@ +*.iml + Propchange: qpid/trunk/qpid/java/management/common/src/main/ ------------------------------------------------------------------------------ --- svn:ignore (added) +++ svn:ignore Wed Dec 28 13:02:41 2011 @@ -0,0 +1,2 @@ +*.iml + Propchange: qpid/trunk/qpid/java/management/common/src/test/ ------------------------------------------------------------------------------ --- svn:ignore (added) +++ svn:ignore Wed Dec 28 13:02:41 2011 @@ -0,0 +1,2 @@ +*.iml + Propchange: qpid/trunk/qpid/java/management/eclipse-plugin/src/main/ ------------------------------------------------------------------------------ --- svn:ignore (added) +++ svn:ignore Wed Dec 28 13:02:41 2011 @@ -0,0 +1,2 @@ +*.iml + Propchange: qpid/trunk/qpid/java/management/eclipse-plugin/src/test/ ------------------------------------------------------------------------------ --- svn:ignore (added) +++ svn:ignore Wed Dec 28 13:02:41 2011 @@ -0,0 +1,2 @@ +*.iml + Propchange: qpid/trunk/qpid/java/management/example/src/main/ ------------------------------------------------------------------------------ --- svn:ignore (added) +++ svn:ignore Wed Dec 28 13:02:41 2011 @@ -0,0 +1,2 @@ +*.iml + Propchange: qpid/trunk/qpid/java/perftests/src/main/ ------------------------------------------------------------------------------ --- svn:ignore (added) +++ svn:ignore Wed Dec 28 13:02:41 2011 @@ -0,0 +1,2 @@ +*.iml + Propchange: qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/test/ ------------------------------------------------------------------------------ --- svn:ignore (added) +++ svn:ignore Wed Dec 28 13:02:41 2011 @@ -0,0 +1,2 @@ +*.iml + Propchange: qpid/trunk/qpid/java/systests/src/main/ ------------------------------------------------------------------------------ --- svn:ignore (added) +++ svn:ignore Wed Dec 28 13:02:41 2011 @@ -0,0 +1,2 @@ +*.iml + Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java?rev=1225178&r1=1225177&r2=1225178&view=diff ============================================================================== --- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java (original) +++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java Wed Dec 28 13:02:41 2011 @@ -22,11 +22,11 @@ package org.apache.qpid.server.store; import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.logging.LogSubject; @@ -35,7 +35,7 @@ import java.util.HashMap; import java.util.Iterator; import java.nio.ByteBuffer; -public class SlowMessageStore implements MessageStore +public class SlowMessageStore implements MessageStore, DurableConfigurationStore { private static final Logger _logger = Logger.getLogger(SlowMessageStore.class); private static final String DELAYS = "delays"; @@ -43,6 +43,7 @@ public class SlowMessageStore implements private HashMap<String, Long> _postDelays = new HashMap<String, Long>(); private long _defaultDelay = 0L; private MessageStore _realStore = new MemoryMessageStore(); + private DurableConfigurationStore _durableConfigurationStore = (MemoryMessageStore) _realStore; private static final String PRE = "pre"; private static final String POST = "post"; private String DEFAULT_DELAY = "default"; @@ -80,12 +81,13 @@ public class SlowMessageStore implements " does not."); } _realStore = (MessageStore) o; - _realStore.configureConfigStore(name, recoveryHandler, config, logSubject); - } - else - { - _realStore.configureConfigStore(name, recoveryHandler, config, logSubject); + if(o instanceof DurableConfigurationStore) + { + _durableConfigurationStore = (DurableConfigurationStore)o; + } } + _durableConfigurationStore.configureConfigStore(name, recoveryHandler, config, logSubject); + } private void configureDelays(Configuration config) @@ -178,28 +180,28 @@ public class SlowMessageStore implements public void createExchange(Exchange exchange) throws AMQStoreException { doPreDelay("createExchange"); - _realStore.createExchange(exchange); + _durableConfigurationStore.createExchange(exchange); doPostDelay("createExchange"); } public void removeExchange(Exchange exchange) throws AMQStoreException { doPreDelay("removeExchange"); - _realStore.removeExchange(exchange); + _durableConfigurationStore.removeExchange(exchange); doPostDelay("removeExchange"); } public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException { doPreDelay("bindQueue"); - _realStore.bindQueue(exchange, routingKey, queue, args); + _durableConfigurationStore.bindQueue(exchange, routingKey, queue, args); doPostDelay("bindQueue"); } public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException { doPreDelay("unbindQueue"); - _realStore.unbindQueue(exchange, routingKey, queue, args); + _durableConfigurationStore.unbindQueue(exchange, routingKey, queue, args); doPostDelay("unbindQueue"); } @@ -211,14 +213,14 @@ public class SlowMessageStore implements public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException { doPreDelay("createQueue"); - _realStore.createQueue(queue, arguments); + _durableConfigurationStore.createQueue(queue, arguments); doPostDelay("createQueue"); } public void removeQueue(AMQQueue queue) throws AMQStoreException { doPreDelay("removeQueue"); - _realStore.removeQueue(queue); + _durableConfigurationStore.removeQueue(queue); doPostDelay("removeQueue"); } @@ -268,19 +270,19 @@ public class SlowMessageStore implements _underlying = underlying; } - public void enqueueMessage(TransactionLogResource queue, Long messageId) + public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException { doPreDelay("enqueueMessage"); - _underlying.enqueueMessage(queue, messageId); + _underlying.enqueueMessage(queue, message); doPostDelay("enqueueMessage"); } - public void dequeueMessage(TransactionLogResource queue, Long messageId) + public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException { doPreDelay("dequeueMessage"); - _underlying.dequeueMessage(queue, messageId); + _underlying.dequeueMessage(queue, message); doPostDelay("dequeueMessage"); } @@ -313,7 +315,7 @@ public class SlowMessageStore implements public void updateQueue(AMQQueue queue) throws AMQStoreException { doPreDelay("updateQueue"); - _realStore.updateQueue(queue); + _durableConfigurationStore.updateQueue(queue); doPostDelay("updateQueue"); } Propchange: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/ ------------------------------------------------------------------------------ --- svn:ignore (added) +++ svn:ignore Wed Dec 28 13:02:41 2011 @@ -0,0 +1,2 @@ +*.iml + Propchange: qpid/trunk/qpid/java/testkit/src/main/ ------------------------------------------------------------------------------ --- svn:ignore (added) +++ svn:ignore Wed Dec 28 13:02:41 2011 @@ -0,0 +1,2 @@ +*.iml + Propchange: qpid/trunk/qpid/java/tools/src/main/ ------------------------------------------------------------------------------ --- svn:ignore (added) +++ svn:ignore Wed Dec 28 13:02:41 2011 @@ -0,0 +1,2 @@ +*.iml + --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:[email protected]
