Pushed the message modifications. At this point message encoding/decoding is handled by the new codec.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a354d9e9 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a354d9e9 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a354d9e9 Branch: refs/heads/rajith-codec Commit: a354d9e92820805475c41ba58648ae130f47bd9a Parents: 19bc8cb Author: Rajith Attapattu <[email protected]> Authored: Wed May 13 12:10:25 2015 -0400 Committer: Rajith Attapattu <[email protected]> Committed: Wed May 13 12:10:25 2015 -0400 ---------------------------------------------------------------------- .../java/org/apache/qpid/proton/Proton.java | 31 +- .../amqp/messaging/MessageAnnotations.java | 1 - .../apache/qpid/proton/message2/Message.java | 23 +- .../qpid/proton/message2/MessageImpl2.java | 519 +++++++++---------- .../apache/qpid/proton/messenger/Messenger.java | 2 +- .../proton/messenger/impl/MessengerImpl.java | 2 +- 6 files changed, 284 insertions(+), 294 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a354d9e9/proton-j/src/main/java/org/apache/qpid/proton/Proton.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/Proton.java b/proton-j/src/main/java/org/apache/qpid/proton/Proton.java index 39b04e5..ab9ea5d 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/Proton.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/Proton.java @@ -21,25 +21,22 @@ package org.apache.qpid.proton; import java.io.IOException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; -import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; -import org.apache.qpid.proton.amqp.messaging.Footer; -import org.apache.qpid.proton.amqp.messaging.Header; -import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; -import org.apache.qpid.proton.amqp.messaging.Properties; -import org.apache.qpid.proton.amqp.messaging.Section; -import org.apache.qpid.proton.codec.Codec; -import org.apache.qpid.proton.codec.Data; + import org.apache.qpid.proton.driver.Driver; -import org.apache.qpid.proton.engine.Engine; import org.apache.qpid.proton.engine.Collector; import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Engine; import org.apache.qpid.proton.engine.SslDomain; import org.apache.qpid.proton.engine.SslPeerDetails; import org.apache.qpid.proton.engine.Transport; -import org.apache.qpid.proton.message.Message; +import org.apache.qpid.proton.message2.ApplicationProperties; +import org.apache.qpid.proton.message2.DeliveryAnnotations; +import org.apache.qpid.proton.message2.Footer; +import org.apache.qpid.proton.message2.Header; +import org.apache.qpid.proton.message2.Message; +import org.apache.qpid.proton.message2.MessageAnnotations; +import org.apache.qpid.proton.message2.Properties; +import org.apache.qpid.proton.message2.Section; import org.apache.qpid.proton.messenger.Messenger; public final class Proton @@ -74,11 +71,6 @@ public final class Proton return Engine.sslPeerDetails(hostname, port); } - public static Data data(long capacity) - { - return Codec.data(capacity); - } - public static Message message() { return Message.Factory.create(); @@ -109,5 +101,4 @@ public final class Proton { return Driver.Factory.create(); } - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a354d9e9/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/MessageAnnotations.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/MessageAnnotations.java b/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/MessageAnnotations.java index 9bf82d6..930ca39 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/MessageAnnotations.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/MessageAnnotations.java @@ -30,7 +30,6 @@ import java.util.Map; public final class MessageAnnotations implements Section { - private final Map<Symbol, Object> _value; public MessageAnnotations(Map<Symbol, Object> value) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a354d9e9/proton-j/src/main/java/org/apache/qpid/proton/message2/Message.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/message2/Message.java b/proton-j/src/main/java/org/apache/qpid/proton/message2/Message.java index 97d90ac..96f1045 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/message2/Message.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/message2/Message.java @@ -20,10 +20,12 @@ */ package org.apache.qpid.proton.message2; -import org.apache.qpid.proton.amqp.messaging.Section; +import java.util.Map; + +import javax.xml.soap.MessageFactory; + import org.apache.qpid.proton.message.MessageError; import org.apache.qpid.proton.message.MessageFormat; -import org.apache.qpid.proton.message.impl.MessageImpl; /** * Represents a Message within Proton. @@ -45,9 +47,8 @@ public interface Message MessageAnnotations messageAnnotations, Properties properties, ApplicationProperties applicationProperties, Section body, Footer footer) { - //return new MessageImpl2(header, deliveryAnnotations, messageAnnotations, properties, applicationProperties, - // body, footer); - return new MessageImpl2(); + return new MessageImpl2(header, deliveryAnnotations, messageAnnotations, properties, applicationProperties, + body, footer); } } @@ -127,13 +128,13 @@ public interface Message Header getHeader(); - DeliveryAnnotations getDeliveryAnnotations(); + Map<String, Object> getDeliveryAnnotations(); - MessageAnnotations getMessageAnnotations(); + Map<String, Object> getMessageAnnotations(); Properties getProperties(); - ApplicationProperties getApplicationProperties(); + Map<Object, Object> getApplicationProperties(); Object getBody(); @@ -143,13 +144,13 @@ public interface Message void setHeader(Header header); - void setDeliveryAnnotations(DeliveryAnnotations deliveryAnnotations); + void setDeliveryAnnotations(Map<String, Object> deliveryAnnotations); - void setMessageAnnotations(MessageAnnotations messageAnnotations); + void setMessageAnnotations(Map<String, Object> messageAnnotations); void setProperties(Properties properties); - void setApplicationProperties(ApplicationProperties applicationProperties); + void setApplicationProperties(Map<Object, Object> props); void setBody(Object body); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a354d9e9/proton-j/src/main/java/org/apache/qpid/proton/message2/MessageImpl2.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/message2/MessageImpl2.java b/proton-j/src/main/java/org/apache/qpid/proton/message2/MessageImpl2.java index 26a7b0c..fc07af0 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/message2/MessageImpl2.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/message2/MessageImpl2.java @@ -17,68 +17,69 @@ * specific language governing permissions and limitations * under the License. * -*/ + */ package org.apache.qpid.proton.message2; -import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Date; +import java.util.List; +import java.util.Map; import org.apache.qpid.proton.amqp.Binary; -import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.UnsignedByte; -import org.apache.qpid.proton.amqp.UnsignedInteger; -import org.apache.qpid.proton.amqp.messaging.*; -import org.apache.qpid.proton.codec.CompositeWritableBuffer; -import org.apache.qpid.proton.codec.DroppingWritableBuffer; -import org.apache.qpid.proton.codec.EncoderImpl; -import org.apache.qpid.proton.codec.WritableBuffer; -import org.apache.qpid.proton.codec2.*; -import org.apache.qpid.proton.message.*; +import org.apache.qpid.proton.codec2.ByteArrayDecoder; +import org.apache.qpid.proton.codec2.ByteArrayEncoder; +import org.apache.qpid.proton.codec2.CodecHelper; +import org.apache.qpid.proton.codec2.POJOBuilder; +import org.apache.qpid.proton.message.MessageError; +import org.apache.qpid.proton.message.MessageFormat; public class MessageImpl2 implements Message { private final AMQPMessageFormat _parser = new AMQPMessageFormat(); private Header _header; + private DeliveryAnnotations _deliveryAnnotations; + private MessageAnnotations _messageAnnotations; + private Properties _properties; + private ApplicationProperties _applicationProperties; - private Object _body; + + private Section _body; + + private List<Section> _bodySections; + private Footer _footer; + private MessageFormat _format = MessageFormat.DATA; - - private static class EncoderDecoderPair { - /*DecoderImpl decoder = new DecoderImpl(); - EncoderImpl encoder = new EncoderImpl(decoder); - { - AMQPDefinedTypes.registerAllTypes(decoder, encoder); - }*/ - ByteArrayEncoder encoder = new ByteArrayEncoder(); - ByteArrayDecoder decoder = new ByteArrayDecoder(); - } - - private static final ThreadLocal<EncoderDecoderPair> tlsCodec = new ThreadLocal<EncoderDecoderPair>() { - @Override protected EncoderDecoderPair initialValue() { + + private static class EncoderDecoderPair + { + ByteArrayEncoder encoder = new ByteArrayEncoder(); + + ByteArrayDecoder decoder = new ByteArrayDecoder(); + } + + private static final ThreadLocal<EncoderDecoderPair> tlsCodec = new ThreadLocal<EncoderDecoderPair>() + { + @Override + protected EncoderDecoderPair initialValue() + { return new EncoderDecoderPair(); - } - }; + } + }; - /** - * @deprecated This constructor's visibility will be reduced to the default scope in a future release. - * Client code outside this module should use a {@link MessageFactory} instead - */ - @Deprecated public MessageImpl2() + // TODO reduce visibility before release. + public MessageImpl2() { } - /** - * @deprecated This constructor's visibility will be reduced to the default scope in a future release. - * Client code outside this module should use a {@link MessageFactory} instead - */ - @Deprecated public MessageImpl2(Header header, DeliveryAnnotations deliveryAnnotations, MessageAnnotations messageAnnotations, - Properties properties, ApplicationProperties applicationProperties, Section body, Footer footer) + // TODO reduce visibility before release. + public MessageImpl2(Header header, DeliveryAnnotations deliveryAnnotations, MessageAnnotations messageAnnotations, + Properties properties, ApplicationProperties applicationProperties, Section body, Footer footer) { _header = header; _deliveryAnnotations = deliveryAnnotations; @@ -87,6 +88,7 @@ public class MessageImpl2 implements Message _applicationProperties = applicationProperties; _body = body; _footer = footer; + } @Override @@ -95,19 +97,16 @@ public class MessageImpl2 implements Message return (_header == null || _header.getDurable() == null) ? false : _header.getDurable(); } - @Override public long getDeliveryCount() { return (_header == null ? 0l : _header.getDeliveryCount()); } - @Override public short getPriority() { - return (_header == null ? DEFAULT_PRIORITY - : _header.getPriority()); + return (_header == null ? DEFAULT_PRIORITY : _header.getPriority()); } @Override @@ -121,7 +120,7 @@ public class MessageImpl2 implements Message { return (_header == null ? 0l : _header.getTtl()); } - + @Override public void setDurable(boolean durable) { @@ -154,8 +153,8 @@ public class MessageImpl2 implements Message return; } } - //TODO - _header.setTtl((int)ttl); + // TODO + _header.setTtl((int) ttl); } @Override @@ -172,7 +171,6 @@ public class MessageImpl2 implements Message _header.setDeliveryCount(deliveryCount); } - @Override public void setFirstAcquirer(boolean firstAcquirer) { @@ -224,7 +222,8 @@ public class MessageImpl2 implements Message @Override public long getCreationTime() { - return (_properties == null || _properties.getCreationTime() == null) ? 0l : _properties.getCreationTime().getTime(); + return (_properties == null || _properties.getCreationTime() == null) ? 0l : _properties.getCreationTime() + .getTime(); } @Override @@ -236,7 +235,7 @@ public class MessageImpl2 implements Message @Override public byte[] getUserId() { - if(_properties == null || _properties.getUserId() == null) + if (_properties == null || _properties.getUserId() == null) { return null; } @@ -267,7 +266,8 @@ public class MessageImpl2 implements Message @Override public long getExpiryTime() { - return (_properties == null || _properties.getAbsoluteExpiryTime() == null) ? 0l : _properties.getAbsoluteExpiryTime().getTime(); + return (_properties == null || _properties.getAbsoluteExpiryTime() == null) ? 0l : _properties + .getAbsoluteExpiryTime().getTime(); } @Override @@ -279,7 +279,8 @@ public class MessageImpl2 implements Message @Override public String getContentEncoding() { - return (_properties == null || _properties.getContentEncoding() == null) ? null : _properties.getContentEncoding(); + return (_properties == null || _properties.getContentEncoding() == null) ? null : _properties + .getContentEncoding(); } @Override @@ -291,9 +292,9 @@ public class MessageImpl2 implements Message @Override public void setGroupSequence(long groupSequence) { - if(_properties == null) + if (_properties == null) { - if(groupSequence == 0l) + if (groupSequence == 0l) { return; } @@ -308,9 +309,9 @@ public class MessageImpl2 implements Message @Override public void setUserId(byte[] userId) { - if(userId == null) + if (userId == null) { - if(_properties != null) + if (_properties != null) { _properties.setUserId(null); } @@ -318,7 +319,7 @@ public class MessageImpl2 implements Message } else { - if(_properties == null) + if (_properties == null) { _properties = new Properties(); } @@ -329,9 +330,9 @@ public class MessageImpl2 implements Message @Override public void setCreationTime(long creationTime) { - if(_properties == null) + if (_properties == null) { - if(creationTime == 0l) + if (creationTime == 0l) { return; } @@ -344,9 +345,9 @@ public class MessageImpl2 implements Message @Override public void setSubject(String subject) { - if(_properties == null) + if (_properties == null) { - if(subject == null) + if (subject == null) { return; } @@ -358,9 +359,9 @@ public class MessageImpl2 implements Message @Override public void setGroupId(String groupId) { - if(_properties == null) + if (_properties == null) { - if(groupId == null) + if (groupId == null) { return; } @@ -372,9 +373,9 @@ public class MessageImpl2 implements Message @Override public void setAddress(String to) { - if(_properties == null) + if (_properties == null) { - if(to == null) + if (to == null) { return; } @@ -386,9 +387,9 @@ public class MessageImpl2 implements Message @Override public void setExpiryTime(long absoluteExpiryTime) { - if(_properties == null) + if (_properties == null) { - if(absoluteExpiryTime == 0l) + if (absoluteExpiryTime == 0l) { return; } @@ -401,9 +402,9 @@ public class MessageImpl2 implements Message @Override public void setReplyToGroupId(String replyToGroupId) { - if(_properties == null) + if (_properties == null) { - if(replyToGroupId == null) + if (replyToGroupId == null) { return; } @@ -415,9 +416,9 @@ public class MessageImpl2 implements Message @Override public void setContentEncoding(String contentEncoding) { - if(_properties == null) + if (_properties == null) { - if(contentEncoding == null) + if (contentEncoding == null) { return; } @@ -429,9 +430,9 @@ public class MessageImpl2 implements Message @Override public void setContentType(String contentType) { - if(_properties == null) + if (_properties == null) { - if(contentType == null) + if (contentType == null) { return; } @@ -444,9 +445,9 @@ public class MessageImpl2 implements Message public void setReplyTo(String replyTo) { - if(_properties == null) + if (_properties == null) { - if(replyTo == null) + if (replyTo == null) { return; } @@ -459,9 +460,9 @@ public class MessageImpl2 implements Message public void setCorrelationId(Object correlationId) { - if(_properties == null) + if (_properties == null) { - if(correlationId == null) + if (correlationId == null) { return; } @@ -474,9 +475,9 @@ public class MessageImpl2 implements Message public void setMessageId(Object messageId) { - if(_properties == null) + if (_properties == null) { - if(messageId == null) + if (messageId == null) { return; } @@ -485,7 +486,6 @@ public class MessageImpl2 implements Message _properties.setMessageId(messageId); } - @Override public Header getHeader() { @@ -493,15 +493,29 @@ public class MessageImpl2 implements Message } @Override - public DeliveryAnnotations getDeliveryAnnotations() + public Map<String, Object> getDeliveryAnnotations() { - return _deliveryAnnotations; + if (_deliveryAnnotations != null) + { + return _deliveryAnnotations.getValue(); + } + else + { + return null; + } } @Override - public MessageAnnotations getMessageAnnotations() + public Map<String, Object> getMessageAnnotations() { - return _messageAnnotations; + if (_messageAnnotations != null) + { + return _messageAnnotations.getValue(); + } + else + { + return null; + } } @Override @@ -511,15 +525,35 @@ public class MessageImpl2 implements Message } @Override - public ApplicationProperties getApplicationProperties() + public Map<Object, Object> getApplicationProperties() { - return _applicationProperties; + if (_applicationProperties != null) + { + return _applicationProperties.getValue(); + } + else + { + return null; + } } @Override public Object getBody() { - return _body; + if (_bodySections.size() == 1) + { + return _body; + } + else if (_bodySections.size() > 1) + { + // TODO figure out the best way to provide a composite view. + // For now just return the list + return _bodySections; + } + else + { + return null; + } } @Override @@ -535,15 +569,15 @@ public class MessageImpl2 implements Message } @Override - public void setDeliveryAnnotations(DeliveryAnnotations deliveryAnnotations) + public void setDeliveryAnnotations(Map<String, Object> deliveryAnnotations) { - _deliveryAnnotations = deliveryAnnotations; + _deliveryAnnotations = new DeliveryAnnotations(deliveryAnnotations); } @Override - public void setMessageAnnotations(MessageAnnotations messageAnnotations) + public void setMessageAnnotations(Map<String, Object> messageAnnotations) { - _messageAnnotations = messageAnnotations; + _messageAnnotations = new MessageAnnotations(messageAnnotations); } @Override @@ -553,15 +587,22 @@ public class MessageImpl2 implements Message } @Override - public void setApplicationProperties(ApplicationProperties applicationProperties) + public void setApplicationProperties(Map<Object, Object> props) { - _applicationProperties = applicationProperties; + _applicationProperties = new ApplicationProperties(props); } @Override public void setBody(Object body) { - _body = body; + if (body instanceof Section) + { + _body = (Section) body; + } + else + { + _body = new AmqpValue(body); + } } @Override @@ -583,155 +624,101 @@ public class MessageImpl2 implements Message _applicationProperties = null; _body = null; _footer = null; - Section section = null; - + POJOBuilder pb = new POJOBuilder(); decoder.decode(pb); - System.out.println(pb.build()); - - return decoder.getSize(); - - /*if(buffer.hasRemaining()) + List<Object> msgParts = (List<Object>) pb.build(); + for (Object part : msgParts) { - section = (Section) decoder - } - if(section instanceof Header) - { - _header = (Header) section; - if(buffer.hasRemaining()) - { - section = (Section) decoder. - } - else - { - section = null; - } - - } - if(section instanceof DeliveryAnnotations) - { - _deliveryAnnotations = (DeliveryAnnotations) section; - - if(buffer.hasRemaining()) - { - section = (Section) decoder.readObject(); - } - else - { - section = null; - } - - } - if(section instanceof MessageAnnotations) - { - _messageAnnotations = (MessageAnnotations) section; - - if(buffer.hasRemaining()) + if (part instanceof Header) { - section = (Section) decoder.readObject(); + _header = (Header) part; } - else + else if (part instanceof DeliveryAnnotations) { - section = null; + _deliveryAnnotations = (DeliveryAnnotations) part; } - - } - if(section instanceof Properties) - { - _properties = (Properties) section; - - if(buffer.hasRemaining()) + else if (part instanceof MessageAnnotations) { - section = (Section) decoder.readObject(); + _messageAnnotations = (MessageAnnotations) part; } - else + else if (part instanceof Properties) { - section = null; + _properties = (Properties) part; } - - } - if(section instanceof ApplicationProperties) - { - _applicationProperties = (ApplicationProperties) section; - - if(buffer.hasRemaining()) + else if (part instanceof ApplicationProperties) { - section = (Section) decoder.readObject(); + _applicationProperties = (ApplicationProperties) part; } - else + else if (part instanceof AmqpValue) { - section = null; + _body = ((AmqpValue) part); } - - } - if(section != null && !(section instanceof Footer)) - { - _body = section; - - if(buffer.hasRemaining()) + else if (part instanceof AmqpSequence || part instanceof Data) { - section = (Section) decoder.readObject(); + if (_body == null) + { + _body = ((Section) part); + } + else if (_bodySections != null) + { + _bodySections.add((Section) part); + } + else + { + _bodySections = new ArrayList<Section>(); + _bodySections.add(_body); + _bodySections.add((Section) part); + } } - else + else if (part instanceof Footer) { - section = null; + _footer = (Footer) part; } - - } - if(section instanceof Footer) - { - _footer = (Footer) section; - } - - decoder.setByteBuffer(null); */ - } - - //@Override - public int encode2(byte[] data, int offset, int length) - { - ByteBuffer buffer = ByteBuffer.wrap(data, offset, length); - WritableBuffer.ByteBufferWrapper first = new WritableBuffer.ByteBufferWrapper(buffer); - DroppingWritableBuffer second = new DroppingWritableBuffer(); - CompositeWritableBuffer composite = new CompositeWritableBuffer(first, second); - int start = composite.position(); - //encode(composite); - return composite.position() - start; + return decoder.getSize(); } - //@Override + // @Override public int encode(byte[] data, int offset, int length) { ByteArrayEncoder encoder = tlsCodec.get().encoder; encoder.init(data, 0, data.length); - if(getHeader() != null) + if (_header != null) { - getHeader().encode(encoder); + _header.encode(encoder); } - if(getDeliveryAnnotations() != null) + if (_deliveryAnnotations != null) { - getDeliveryAnnotations().encode(encoder); + _deliveryAnnotations.encode(encoder); } - if(getMessageAnnotations() != null) + if (_messageAnnotations != null) { - getMessageAnnotations().encode(encoder);; + _messageAnnotations.encode(encoder); } - if(getProperties() != null) + if (_properties != null) { - getProperties().encode(encoder);; + _properties.encode(encoder); } - if(getApplicationProperties() != null) + if (_applicationProperties != null) { - getApplicationProperties().encode(encoder);; + _applicationProperties.encode(encoder); + } + if (_bodySections != null) + { + for (Section section : _bodySections) + { + CodecHelper.encodeObject(encoder, section); + } } - if(getBody() != null) + else if (_body != null) { - CodecHelper.encodeObject(encoder, getBody()); + CodecHelper.encodeObject(encoder, _body); } - if(getFooter() != null) + if (_footer != null) { - getFooter().encode(encoder);; + _footer.encode(encoder); } return encoder.getPosition(); } @@ -741,38 +728,38 @@ public class MessageImpl2 implements Message { switch (_format) { - case DATA: - Binary binData; - if(data instanceof byte[]) - { - binData = new Binary((byte[])data); - } - else if(data instanceof Binary) - { - binData = (Binary) data; - } - else if(data instanceof String) - { - final String strData = (String) data; - byte[] bin = new byte[strData.length()]; - for(int i = 0; i < bin.length; i++) - { - bin[i] = (byte) strData.charAt(i); - } - binData = new Binary(bin); - } - else + case DATA: + Binary binData; + if (data instanceof byte[]) + { + binData = new Binary((byte[]) data); + } + else if (data instanceof Binary) + { + binData = (Binary) data; + } + else if (data instanceof String) + { + final String strData = (String) data; + byte[] bin = new byte[strData.length()]; + for (int i = 0; i < bin.length; i++) { - binData = null; + bin[i] = (byte) strData.charAt(i); } - //_body = new Data(binData); - break; - case TEXT: - _body = new AmqpValue(data == null ? "" : data.toString()); - break; - default: - // AMQP - _body = new AmqpValue(parseAMQPFormat((String) data)); + binData = new Binary(bin); + } + else + { + binData = null; + } + // _body = new Data(binData); + break; + case TEXT: + _body = new AmqpValue(data == null ? "" : data.toString()); + break; + default: + // AMQP + _body = new AmqpValue(parseAMQPFormat((String) data)); } } @@ -782,30 +769,31 @@ public class MessageImpl2 implements Message { switch (_format) { - case DATA: - if(_body instanceof Data) - { - return null; // ((Data)_body).getValue().getArray(); - } - else return null; - case AMQP: - if(_body instanceof AmqpValue) - { - return toAMQPFormat(((AmqpValue) _body).getValue()); - } - else - { - return null; - } - case TEXT: - if(_body instanceof AmqpValue) - { - final Object value = ((AmqpValue) _body).getValue(); - return value == null ? "" : value.toString(); - } + case DATA: + if (_body instanceof Data) + { + return null; // ((Data)_body).getValue().getArray(); + } + else return null; - default: + case AMQP: + if (_body instanceof AmqpValue) + { + return toAMQPFormat(((AmqpValue) _body).getValue()); + } + else + { return null; + } + case TEXT: + if (_body instanceof AmqpValue) + { + final Object value = ((AmqpValue) _body).getValue(); + return value == null ? "" : value.toString(); + } + return null; + default: + return null; } } @@ -839,6 +827,7 @@ public class MessageImpl2 implements Message public void clear() { _body = null; + _bodySections.clear(); } @Override @@ -848,16 +837,26 @@ public class MessageImpl2 implements Message } @Override - public org.apache.qpid.proton.amqp.messaging.Section getBodyAsSection(int i) + public Section getBodyAsSection(int i) { - // TODO Auto-generated method stub - return null; + if (_bodySections == null) + { + return _body; + } + else + { + // Let the list impl throw ArrayIndexOutofBounds + return _bodySections.get(i); + } } @Override - public void addBodySection(org.apache.qpid.proton.amqp.messaging.Section section) + public void addBodySection(Section section) { - // TODO Auto-generated method stub - + if (_bodySections == null) + { + _bodySections = new ArrayList<Section>(); + } + _bodySections.add(section); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a354d9e9/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java index 6d3f362..cd6d93d 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java @@ -23,7 +23,7 @@ package org.apache.qpid.proton.messenger; import java.io.IOException; import org.apache.qpid.proton.TimeoutException; -import org.apache.qpid.proton.message.Message; +import org.apache.qpid.proton.message2.Message; import org.apache.qpid.proton.messenger.impl.MessengerImpl; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a354d9e9/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java index e6475b9..2ce57e9 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java @@ -45,7 +45,7 @@ import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.engine.SslDomain; import org.apache.qpid.proton.engine.Ssl; import org.apache.qpid.proton.engine.Transport; -import org.apache.qpid.proton.message.Message; +import org.apache.qpid.proton.message2.Message; import org.apache.qpid.proton.messenger.Messenger; import org.apache.qpid.proton.messenger.MessengerException; import org.apache.qpid.proton.messenger.Status; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
