Author: robbie
Date: Mon May 12 11:40:17 2014
New Revision: 1593932
URL: http://svn.apache.org/r1593932
Log:
QPIDJMS-20: initial work on using AMQP type encoding for sent/received
ObjectMessage
Added:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessageAmqpTypedDelegate.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessageDelegate.java
- copied, changed from r1593931,
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessageSerializedDelegate.java
- copied, changed from r1593931,
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSerializedObjectMessage.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpObjectMessageTest.java
- copied, changed from r1593931,
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpSerializedObjectMessageTest.java
Removed:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSerializedObjectMessage.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpSerializedObjectMessageTest.java
Modified:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessageFactory.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ClientProperties.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/ObjectMessageIntegrationTest.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageFactoryTest.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ObjectMessageImplTest.java
Modified:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessageFactory.java
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessageFactory.java?rev=1593932&r1=1593931&r2=1593932&view=diff
==============================================================================
---
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessageFactory.java
(original)
+++
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessageFactory.java
Mon May 12 11:40:17 2014
@@ -21,7 +21,10 @@ package org.apache.qpid.jms.engine;
import java.util.List;
import java.util.Map;
+import org.apache.qpid.jms.impl.ClientProperties;
+import org.apache.qpid.jms.impl.ObjectMessageImpl;
import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Section;
@@ -34,15 +37,28 @@ public class AmqpMessageFactory
{
Section body = message.getBody();
+ //TODO: This is a temporary hack, need to implement proper support for
the new
+ //message type annotation by generally rewriting the entire factory
method
+ AmqpGenericMessage msg = new AmqpGenericMessage(delivery, message,
amqpConnection);
+ if(msg.messageAnnotationExists(ClientProperties.X_OPT_JMS_MSG_TYPE))
+ {
+ UnsignedByte ub = (UnsignedByte)
msg.getMessageAnnotation(ClientProperties.X_OPT_JMS_MSG_TYPE);
+ if(ub.shortValue() == ObjectMessageImpl.X_OPT_JMS_MSG_TYPE_VALUE)
+ {
+ boolean isJavaSerialized =
isContentType(AmqpObjectMessageSerializedDelegate.CONTENT_TYPE, message);
+ return new AmqpObjectMessage(delivery, message,
amqpConnection, !isJavaSerialized);
+ }
+ }
+
if(body == null)
{
if(isContentType(AmqpTextMessage.CONTENT_TYPE, message))
{
return new AmqpTextMessage(message, delivery, amqpConnection);
}
- else if(isContentType(AmqpSerializedObjectMessage.CONTENT_TYPE,
message))
+ else
if(isContentType(AmqpObjectMessageSerializedDelegate.CONTENT_TYPE, message))
{
- return new AmqpSerializedObjectMessage(delivery, message,
amqpConnection);
+ return new AmqpObjectMessage(delivery, message,
amqpConnection, false);
}
else if(isContentType(AmqpBytesMessage.CONTENT_TYPE, message) ||
isContentType(null, message))
{
@@ -59,9 +75,9 @@ public class AmqpMessageFactory
{
return new AmqpBytesMessage(message, delivery, amqpConnection);
}
- else if(isContentType(AmqpSerializedObjectMessage.CONTENT_TYPE,
message))
+ else
if(isContentType(AmqpObjectMessageSerializedDelegate.CONTENT_TYPE, message))
{
- return new AmqpSerializedObjectMessage(delivery, message,
amqpConnection);
+ return new AmqpObjectMessage(delivery, message,
amqpConnection, false);
}
}
else if(body instanceof AmqpValue)
@@ -105,5 +121,4 @@ public class AmqpMessageFactory
return contentType.equals(message.getContentType());
}
}
-
}
Modified:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java?rev=1593932&r1=1593931&r2=1593932&view=diff
==============================================================================
---
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java
(original)
+++
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java
Mon May 12 11:40:17 2014
@@ -1,4 +1,5 @@
/*
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,6 +16,7 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
+ *
*/
package org.apache.qpid.jms.engine;
@@ -24,19 +26,76 @@ import java.io.Serializable;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.message.Message;
-public abstract class AmqpObjectMessage extends AmqpMessage
+public class AmqpObjectMessage extends AmqpMessage
{
+ private AmqpObjectMessageDelegate _delegate;
+ private boolean _useAmqpTypeEncoding = false;
+
public AmqpObjectMessage()
{
super();
+ setContentType(AmqpObjectMessageSerializedDelegate.CONTENT_TYPE);
+ initDelegate(false);
}
- public AmqpObjectMessage(Message message, Delivery delivery,
AmqpConnection amqpConnection)
+ public AmqpObjectMessage(Delivery delivery, Message message,
AmqpConnection amqpConnection, boolean useAmqpTypes)
{
super(message, delivery, amqpConnection);
+
+ initDelegate(useAmqpTypes);
}
- public abstract void setObject(Serializable serializable) throws
IOException;
+ private void initDelegate(boolean useAmqpTypes)
+ {
+ if(!useAmqpTypes)
+ {
+ _delegate = new AmqpObjectMessageSerializedDelegate(this);
+ }
+ else
+ {
+ _delegate = new AmqpObjectMessageAmqpTypedDelegate(this);
+ }
+ }
+
+ /**
+ * Sets the serialized object as a data section in the underlying message,
or
+ * clears the body section if null.
+ */
+ public void setObject(Serializable serializable) throws IOException
+ {
+ _delegate.setObject(serializable);
+ }
- public abstract Serializable getObject() throws IOException,
ClassNotFoundException;
+ /**
+ * Returns the deserialized object, or null if no object data is present.
+ */
+ public Serializable getObject() throws IllegalStateException, IOException,
ClassNotFoundException
+ {
+ return _delegate.getObject();
+ }
+
+ public void setUseAmqpTypeEncoding(boolean useAmqpTypeEncoding) throws
ClassNotFoundException, IOException
+ {
+ if(_useAmqpTypeEncoding != useAmqpTypeEncoding)
+ {
+ Serializable existingObject = _delegate.getObject();
+
+ AmqpObjectMessageDelegate newDelegate = null;
+ if(useAmqpTypeEncoding)
+ {
+ newDelegate = new AmqpObjectMessageAmqpTypedDelegate(this);
+ }
+ else if(!useAmqpTypeEncoding)
+ {
+ newDelegate = new AmqpObjectMessageSerializedDelegate(this);
+ }
+
+ newDelegate.setObject(existingObject);
+
+ _delegate = newDelegate;
+ _useAmqpTypeEncoding = useAmqpTypeEncoding;
+
+ //TODO: ensure we only set content-type if we are using a Data
section
+ }
+ }
}
Added:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessageAmqpTypedDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessageAmqpTypedDelegate.java?rev=1593932&view=auto
==============================================================================
---
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessageAmqpTypedDelegate.java
(added)
+++
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessageAmqpTypedDelegate.java
Mon May 12 11:40:17 2014
@@ -0,0 +1,113 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.engine;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.Section;
+
+public class AmqpObjectMessageAmqpTypedDelegate implements
AmqpObjectMessageDelegate
+{
+ private AmqpObjectMessage _parent;
+
+ public AmqpObjectMessageAmqpTypedDelegate(AmqpObjectMessage
amqpObjectMessage)
+ {
+ _parent = amqpObjectMessage;
+ }
+
+ /**
+ * Sets the Serializable object as an AmqpValue/Data/AmqpSequence section
+ * in the underlying message, or clears the body section if null.
+ */
+ @Override
+ public void setObject(Serializable serializable) throws IOException
+ {
+ if(serializable == null)
+ {
+ //TODO: verify whether not sending a body is OK, send some form of
+ //null (AmqpValue containing null) instead if it isn't?
+ _parent.getMessage().setBody(null);
+ }
+ else if(isSupportedAmqpValueObjectType(serializable))
+ {
+ //TODO: This is a temporary hack, we actually need to take a
snapshot of the object at this point in time, not simply set the object itself
into the Proton message.
+ //We will need to encode it now, first to save the snapshot to
send, and also to verify up front that we can actually send it later.
+
+ //Even if we do that we would currently then need to decode it
later to set the body to send, unless we augment Proton to allow setting the
bytes directly.
+ //We will always need to decode bytes to return a snapshot from
getObject(). We will need to save the bytes somehow to support that on received
messages.
+ _parent.getMessage().setBody(new AmqpValue(serializable));
+ }
+ else //TODO: Data and AmqpSequence?
+ {
+ throw new IllegalArgumentException("Encoding this object type with
the AMQP type system is not supported: " + serializable.getClass().getName());
+ }
+
+ //TODO: ensure content type is not set (assuming we aren't using data
sections)?
+ }
+
+ private boolean isSupportedAmqpValueObjectType(Serializable serializable)
+ {
+ //TODO: augment supported types to encode as an AmqpValue?
+ return serializable instanceof Map<?,?> || serializable instanceof
List<?> || serializable.getClass().isArray();
+ }
+
+ /**
+ * Returns the deserialized object, or null if no object data is present.
+ */
+ @Override
+ public Serializable getObject() throws IllegalStateException,
ClassCastException
+ {
+ //TODO: this should actually return a snapshot of the object, so we
+ //need to save the bytes so we can return an equal/unmodified object
later
+
+ Section body = _parent.getMessage().getBody();
+ if(body == null)
+ {
+ return null;
+ }
+ else if(body instanceof AmqpValue)
+ {
+ //TODO: This is assuming the object can be immediately returned,
and is Serializable.
+ //We will actually have to ensure elements are Serializable and
e.g convert the Uint/Ubyte etc wrappers.
+ return (Serializable) ((AmqpValue) body).getValue();
+ }
+ else if(body instanceof Data)
+ {
+ //TODO: return as byte[]? ByteBuffer?
+ throw new UnsupportedOperationException("Data support still to be
added");
+ }
+ else if(body instanceof AmqpSequence)
+ {
+ //TODO: return as list?
+ throw new UnsupportedOperationException("AmqpSequence support
still to be added");
+ }
+ else
+ {
+ throw new IllegalStateException("Unexpected body type: " +
body.getClass().getSimpleName());
+ }
+ }
+}
Copied:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessageDelegate.java
(from r1593931,
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java)
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessageDelegate.java?p2=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessageDelegate.java&p1=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java&r1=1593931&r2=1593932&rev=1593932&view=diff
==============================================================================
---
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java
(original)
+++
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessageDelegate.java
Mon May 12 11:40:17 2014
@@ -1,4 +1,5 @@
/*
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,28 +16,16 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
+ *
*/
package org.apache.qpid.jms.engine;
import java.io.IOException;
import java.io.Serializable;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.message.Message;
-
-public abstract class AmqpObjectMessage extends AmqpMessage
+public interface AmqpObjectMessageDelegate
{
- public AmqpObjectMessage()
- {
- super();
- }
-
- public AmqpObjectMessage(Message message, Delivery delivery,
AmqpConnection amqpConnection)
- {
- super(message, delivery, amqpConnection);
- }
-
- public abstract void setObject(Serializable serializable) throws
IOException;
+ void setObject(Serializable serializable) throws IOException;
- public abstract Serializable getObject() throws IOException,
ClassNotFoundException;
+ Serializable getObject() throws IOException, ClassNotFoundException;
}
Copied:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessageSerializedDelegate.java
(from r1593931,
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSerializedObjectMessage.java)
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessageSerializedDelegate.java?p2=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessageSerializedDelegate.java&p1=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSerializedObjectMessage.java&r1=1593931&r2=1593932&rev=1593932&view=diff
==============================================================================
---
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSerializedObjectMessage.java
(original)
+++
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessageSerializedDelegate.java
Mon May 12 11:40:17 2014
@@ -30,24 +30,16 @@ import java.io.Serializable;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Section;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.message.Message;
-public class AmqpSerializedObjectMessage extends AmqpObjectMessage
+public class AmqpObjectMessageSerializedDelegate implements
AmqpObjectMessageDelegate
{
+ private AmqpObjectMessage _amqpObjectMessage;
public static final String CONTENT_TYPE =
"application/x-java-serialized-object";
- public AmqpSerializedObjectMessage()
+ public AmqpObjectMessageSerializedDelegate(AmqpObjectMessage
amqpObjectMessage)
{
- super();
- setContentType(CONTENT_TYPE);
+ _amqpObjectMessage = amqpObjectMessage;
}
-
- public AmqpSerializedObjectMessage(Delivery delivery, Message message,
AmqpConnection amqpConnection)
- {
- super(message, delivery, amqpConnection);
- }
-
/**
* Sets the serialized object as a data section in the underlying message,
or
* clears the body section if null.
@@ -59,7 +51,7 @@ public class AmqpSerializedObjectMessage
{
//TODO: verify whether not sending a body is ok,
//send a serialized null instead if it isn't
- getMessage().setBody(null);
+ _amqpObjectMessage.getMessage().setBody(null);
}
else
{
@@ -72,7 +64,7 @@ public class AmqpSerializedObjectMessage
byte[] bytes = baos.toByteArray();
- getMessage().setBody(new Data(new Binary(bytes)));
+ _amqpObjectMessage.getMessage().setBody(new Data(new
Binary(bytes)));
}
//TODO: ensure content type is [still] set?
@@ -86,7 +78,7 @@ public class AmqpSerializedObjectMessage
{
Binary bin = null;
- Section body = getMessage().getBody();
+ Section body = _amqpObjectMessage.getMessage().getBody();
if(body == null)
{
return null;
@@ -112,4 +104,5 @@ public class AmqpSerializedObjectMessage
return (Serializable) ois.readObject();
}
}
+
}
Modified:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ClientProperties.java
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ClientProperties.java?rev=1593932&r1=1593931&r2=1593932&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ClientProperties.java
(original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ClientProperties.java
Mon May 12 11:40:17 2014
@@ -30,8 +30,10 @@ public class ClientProperties
//Custom Message Property Names
public static final String JMS_AMQP_TTL = "JMS_AMQP_TTL";
public static final String JMS_AMQP_REPLY_TO_GROUP_ID =
"JMS_AMQP_REPLY_TO_GROUP_ID";
+ public static final String JMS_AMQP_TYPED_ENCODING =
"JMS_AMQP_TYPED_ENCODING";
//Message Annotation Names
+ public static final String X_OPT_JMS_MSG_TYPE = "x-opt-jms-msg-type";
public static final String X_OPT_APP_CORRELATION_ID =
"x-opt-app-correlation-id";
public static final String X_OPT_JMS_TYPE = "x-opt-jms-type";
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java?rev=1593932&r1=1593931&r2=1593932&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
(original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java Mon
May 12 11:40:17 2014
@@ -23,6 +23,7 @@ import static org.apache.qpid.jms.impl.C
import static org.apache.qpid.jms.impl.ClientProperties.JMSXUSERID;
import static org.apache.qpid.jms.impl.ClientProperties.JMSXGROUPID;
import static org.apache.qpid.jms.impl.ClientProperties.JMSXGROUPSEQ;
+import static
org.apache.qpid.jms.impl.ClientProperties.JMS_AMQP_TYPED_ENCODING;
import static org.apache.qpid.jms.impl.MessageIdHelper.JMS_ID_PREFIX;
import java.io.UnsupportedEncodingException;
@@ -61,6 +62,12 @@ public abstract class MessageImpl<T exte
*/
private Long _propJMS_AMQP_TTL = null;
+ /**
+ * Used to record the value of JMS_AMQP_TYPED_ENCODING property
+ * if it is explicitly set by the application
+ */
+ private Boolean _propJMS_AMQP_TYPED_ENCODING = null;
+
//message to be sent
public MessageImpl(T amqpMessage, SessionImpl sessionImpl, ConnectionImpl
connectionImpl)
{
@@ -248,6 +255,11 @@ public abstract class MessageImpl<T exte
setJMSXGroupSeq(value);
return;
}
+ else if(JMS_AMQP_TYPED_ENCODING.equals(name))
+ {
+ setJMS_AMQP_TYPED_ENCODING(value);
+ return;
+ }
checkObjectPropertyValueIsValid(value);
@@ -351,6 +363,30 @@ public abstract class MessageImpl<T exte
_amqpMessage.setReplyToGroupId(replyToGroupId);
}
+ private void setJMS_AMQP_TYPED_ENCODING(Object value) throws
MessageFormatException
+ {
+ Boolean amqpTypedEncoding = null;
+ if(value != null)
+ {
+ if(value instanceof Boolean)
+ {
+ amqpTypedEncoding = (Boolean) value;
+ }
+ else
+ {
+ throw new
QpidJmsMessageFormatException(JMS_AMQP_TYPED_ENCODING + " must be a Boolean");
+ }
+ }
+
+ notifyChangeJMS_AMQP_TYPED_ENCODING(amqpTypedEncoding);
+ _propJMS_AMQP_TYPED_ENCODING = amqpTypedEncoding;
+ }
+
+ void notifyChangeJMS_AMQP_TYPED_ENCODING(Boolean value) throws
QpidJmsMessageFormatException
+ {
+ throw new QpidJmsMessageFormatException(JMS_AMQP_TYPED_ENCODING + " is
only applicable to ObjectMessage");
+ }
+
private Object getApplicationProperty(String name) throws
MessageFormatException
{
checkPropertyNameIsValid(name);
@@ -359,6 +395,10 @@ public abstract class MessageImpl<T exte
{
return _propJMS_AMQP_TTL;
}
+ if(JMS_AMQP_TYPED_ENCODING.equals(name))
+ {
+ return _propJMS_AMQP_TYPED_ENCODING;
+ }
else if(JMSXUSERID.equals(name))
{
return getJMSXUserID();
@@ -435,6 +475,11 @@ public abstract class MessageImpl<T exte
return _propJMS_AMQP_TTL != null;
}
+ private boolean propertyExistsJMS_AMQP_TYPED_ENCODING()
+ {
+ return _propJMS_AMQP_TYPED_ENCODING != null;
+ }
+
private boolean propertyExistsJMS_AMQP_REPLY_TO_GROUP_ID()
{
return _amqpMessage.getReplyToGroupId() != null;
@@ -778,6 +823,11 @@ public abstract class MessageImpl<T exte
_amqpMessage.setGroupSequence(null);
//TODO: Clear any new custom properties.
+
+ //TODO: should this be the case?
+ //We explicitly don't clear _propJMS_AMQP_TYPED_ENCODING
+ //because it isn't really a property, just a flag to
+ //control/indicate the way the message will be output.
}
@Override
@@ -788,6 +838,11 @@ public abstract class MessageImpl<T exte
return propertyExistsJMS_AMQP_TTL();
}
+ if(JMS_AMQP_TYPED_ENCODING.equals(name))
+ {
+ return propertyExistsJMS_AMQP_TYPED_ENCODING();
+ }
+
if(JMS_AMQP_REPLY_TO_GROUP_ID.equals(name))
{
return propertyExistsJMS_AMQP_REPLY_TO_GROUP_ID();
@@ -1023,6 +1078,11 @@ public abstract class MessageImpl<T exte
propNames.add(JMS_AMQP_TTL);
}
+ if(propertyExistsJMS_AMQP_TYPED_ENCODING())
+ {
+ propNames.add(JMS_AMQP_TYPED_ENCODING);
+ }
+
if(propertyExistsJMS_AMQP_REPLY_TO_GROUP_ID())
{
propNames.add(JMS_AMQP_REPLY_TO_GROUP_ID);
Modified:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java?rev=1593932&r1=1593931&r2=1593932&view=diff
==============================================================================
---
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java
(original)
+++
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java
Mon May 12 11:40:17 2014
@@ -18,6 +18,8 @@
*/
package org.apache.qpid.jms.impl;
+import static
org.apache.qpid.jms.impl.ClientProperties.JMS_AMQP_TYPED_ENCODING;
+
import java.io.Serializable;
import javax.jms.Destination;
@@ -25,15 +27,21 @@ import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import org.apache.qpid.jms.engine.AmqpObjectMessage;
-import org.apache.qpid.jms.engine.AmqpSerializedObjectMessage;
-//TODO: support requesting to send an AMQP map/list/value instead of
serialized binary data
public class ObjectMessageImpl extends MessageImpl<AmqpObjectMessage>
implements ObjectMessage
{
+ /**
+ * Value used for the X_OPT_JMS_MSG_TYPE message annotation to indicate an
ObjectMessage
+ */
+ public static final byte X_OPT_JMS_MSG_TYPE_VALUE = 1;
+
+ //TODO: add a way of controlling this: per connection and client wide?
+ private Boolean _defaultUseAmqpTypeEncoding = false;
+
//message to be sent
public ObjectMessageImpl(SessionImpl sessionImpl, ConnectionImpl
connectionImpl) throws JMSException
{
- super(new AmqpSerializedObjectMessage(), sessionImpl, connectionImpl);
+ super(new AmqpObjectMessage(), sessionImpl, connectionImpl);
}
//message just received
@@ -45,11 +53,49 @@ public class ObjectMessageImpl extends M
@Override
protected AmqpObjectMessage
prepareUnderlyingAmqpMessageForSending(AmqpObjectMessage amqpMessage)
{
- //Currently nothing to do, we [de]serialize the bytes direct to/from
the underlying message.
+ //Currently nothing to do, we [de]serialize the bytes/bodies direct
to/from the underlying message.
return amqpMessage;
+ }
- //TODO: verify we haven't been requested to send an AMQP map/list
instead of serialized binary data,
- //we might need to convert if we have been asked to (or not to).
+ @Override
+ void notifyChangeJMS_AMQP_TYPED_ENCODING(Boolean value) throws
QpidJmsMessageFormatException
+ {
+ /* TODO
+ *
+ * JMS_AMQP_TYPED_ENCODING as a means of controlling/signalling
whether an ObjectMessage is
+ * sent/received as serialized Java, or using the AMQP type system.
+ *
+ * NOTES/Questions:
+ *
+ * # We need to support converting from one type to the other with
existing content, because we can't control when another JMS provider will set
the property relative to the content.
+ *
+ * # If we don't put it in the result of getPropertyNames() then it
wont survive a 're-populate the properties' by clearing and setting them again
+ * - happens when being sent by another provider
+ * - being used by an app that wants to remove properties or add
properties to a received message even with the same provider
+ *
+ * # If we do put it in the property names, clearing the property
names either has to:
+ * - leave that special property present to keep signalling what
will happen when sending the message
+ * - clear the property and if necessary (depends on the default)
alter the encoding type of the body (which might not be cleared)
+ * - clear the property but regardless NOT alter the type of the
body (which might not be cleared)
+ *
+ * # Do we add it to the property names if the connection/client has
an [overriding] default configuration?
+ *
+ * # Do we add it to the property names for ObjectMessages which are
received with the AMQP type encoding?
+ */
+ boolean useAmqpTypeEnc =_defaultUseAmqpTypeEncoding;
+ if(value != null)
+ {
+ useAmqpTypeEnc = value;
+ }
+
+ try
+ {
+
getUnderlyingAmqpMessage(false).setUseAmqpTypeEncoding(useAmqpTypeEnc);
+ }
+ catch (Exception e)
+ {
+ throw new QpidJmsMessageFormatException("Exception setting " +
JMS_AMQP_TYPED_ENCODING, e);
+ }
}
//======= JMS Methods =======
Modified:
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/ObjectMessageIntegrationTest.java
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/ObjectMessageIntegrationTest.java?rev=1593932&r1=1593931&r2=1593932&view=diff
==============================================================================
---
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/ObjectMessageIntegrationTest.java
(original)
+++
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/ObjectMessageIntegrationTest.java
Mon May 12 11:40:17 2014
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTru
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
+import java.util.HashMap;
import javax.jms.Connection;
import javax.jms.Message;
@@ -34,24 +35,32 @@ import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
-import org.apache.qpid.jms.engine.AmqpSerializedObjectMessage;
+import org.apache.qpid.jms.engine.AmqpObjectMessageSerializedDelegate;
+import org.apache.qpid.jms.impl.ClientProperties;
+import org.apache.qpid.jms.impl.ObjectMessageImpl;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import
org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
import
org.apache.qpid.jms.test.testpeer.describedtypes.sections.DataDescribedType;
+import
org.apache.qpid.jms.test.testpeer.describedtypes.sections.MessageAnnotationsDescribedType;
import
org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType;
import
org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
import
org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
import
org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher;
import
org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
+import
org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedDataMatcher;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedByte;
import org.junit.Test;
public class ObjectMessageIntegrationTest extends QpidJmsTestCase
{
private final IntegrationTestFixture _testFixture = new
IntegrationTestFixture();
+ //==== Java serialization encoding ====
+
@Test
public void testSendBasicObjectMessageWithSerializedContent() throws
Exception
{
@@ -77,7 +86,7 @@ public class ObjectMessageIntegrationTes
MessageHeaderSectionMatcher headersMatcher = new
MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new
MessageAnnotationsSectionMatcher(true);
MessagePropertiesSectionMatcher propertiesMatcher = new
MessagePropertiesSectionMatcher(true);
-
propertiesMatcher.withContentType(equalTo(Symbol.valueOf(AmqpSerializedObjectMessage.CONTENT_TYPE)));
+
propertiesMatcher.withContentType(equalTo(Symbol.valueOf(AmqpObjectMessageSerializedDelegate.CONTENT_TYPE)));
TransferPayloadCompositeMatcher messageMatcher = new
TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
@@ -109,7 +118,7 @@ public class ObjectMessageIntegrationTes
Queue queue = session.createQueue("myQueue");
PropertiesDescribedType properties = new PropertiesDescribedType();
-
properties.setContentType(Symbol.valueOf(AmqpSerializedObjectMessage.CONTENT_TYPE));
+
properties.setContentType(Symbol.valueOf(AmqpObjectMessageSerializedDelegate.CONTENT_TYPE));
String expectedContent = "expectedContent";
@@ -154,7 +163,7 @@ public class ObjectMessageIntegrationTes
Queue queue = session.createQueue("myQueue");
PropertiesDescribedType properties = new PropertiesDescribedType();
-
properties.setContentType(Symbol.valueOf(AmqpSerializedObjectMessage.CONTENT_TYPE));
+
properties.setContentType(Symbol.valueOf(AmqpObjectMessageSerializedDelegate.CONTENT_TYPE));
String expectedContent = "expectedContent";
@@ -184,7 +193,7 @@ public class ObjectMessageIntegrationTes
MessageHeaderSectionMatcher headersMatcher = new
MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new
MessageAnnotationsSectionMatcher(true);
MessagePropertiesSectionMatcher propertiesMatcher = new
MessagePropertiesSectionMatcher(true);
-
propertiesMatcher.withContentType(equalTo(Symbol.valueOf(AmqpSerializedObjectMessage.CONTENT_TYPE)));
+
propertiesMatcher.withContentType(equalTo(Symbol.valueOf(AmqpObjectMessageSerializedDelegate.CONTENT_TYPE)));
TransferPayloadCompositeMatcher messageMatcher = new
TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
@@ -198,4 +207,84 @@ public class ObjectMessageIntegrationTes
testPeer.waitForAllHandlersToComplete(3000);
}
}
+
+ //==== AMQP type system encoding ====
+
+ @Test
+ public void testSendBasicObjectMessageWithAmqpTypedContent() throws
Exception
+ {
+ try(TestAmqpPeer testPeer = new
TestAmqpPeer(IntegrationTestFixture.PORT);)
+ {
+ Connection connection = _testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+ MessageProducer producer = session.createProducer(queue);
+
+ HashMap<String,String> map = new HashMap<String,String>();
+ map.put("key", "myObjectString");
+
+ MessageHeaderSectionMatcher headersMatcher = new
MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new
MessageAnnotationsSectionMatcher(true);
+ MessagePropertiesSectionMatcher propertiesMatcher = new
MessagePropertiesSectionMatcher(true);
+ //TODO: fix this, shouldn't be true for AMQP typed messages
(unless they use a Data section)
+
propertiesMatcher.withContentType(equalTo(Symbol.valueOf(AmqpObjectMessageSerializedDelegate.CONTENT_TYPE)));
+ TransferPayloadCompositeMatcher messageMatcher = new
TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+ messageMatcher.setPropertiesMatcher(propertiesMatcher);
+ messageMatcher.setMessageContentMatcher(new
EncodedAmqpValueMatcher(map));
+
+ testPeer.expectTransfer(messageMatcher);
+
+ ObjectMessage message = session.createObjectMessage();
+
message.setBooleanProperty(ClientProperties.JMS_AMQP_TYPED_ENCODING, true);
+ message.setObject(map);
+
+ producer.send(message);
+
+ testPeer.waitForAllHandlersToComplete(3000);
+ }
+ }
+
+ @Test
+ public void
testRecieveBasicObjectMessageWithAmqpTypedContentAndJMSMessageTypeAnnotation()
throws Exception
+ {
+ try(TestAmqpPeer testPeer = new
TestAmqpPeer(IntegrationTestFixture.PORT);)
+ {
+ Connection connection = _testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ MessageAnnotationsDescribedType msgAnnotations = new
MessageAnnotationsDescribedType();
+
msgAnnotations.setSymbolKeyedAnnotation(ClientProperties.X_OPT_JMS_MSG_TYPE,
UnsignedByte.valueOf(ObjectMessageImpl.X_OPT_JMS_MSG_TYPE_VALUE));
+
+ HashMap<String,String> map = new HashMap<String,String>();
+ map.put("key", "myObjectString");
+
+ DescribedType amqpValueContent = new AmqpValueDescribedType(map);
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations,
null, null, amqpValueContent);
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ Message receivedMessage = messageConsumer.receive(1000);
+ testPeer.waitForAllHandlersToComplete(3000);
+
+ assertNotNull(receivedMessage);
+ assertTrue("Expected ObjectMessage instance, but got: " +
receivedMessage.getClass().getName(), receivedMessage instanceof ObjectMessage);
+ ObjectMessage objectMessage = (ObjectMessage)receivedMessage;
+
+ Object object = objectMessage.getObject();
+ assertNotNull("Expected object but got null", object);
+ assertEquals("Message body object was not as expected", map,
object);
+ }
+ }
}
Modified:
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageFactoryTest.java
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageFactoryTest.java?rev=1593932&r1=1593931&r2=1593932&view=diff
==============================================================================
---
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageFactoryTest.java
(original)
+++
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageFactoryTest.java
Mon May 12 11:40:17 2014
@@ -92,17 +92,17 @@ public class AmqpMessageFactoryTest exte
/**
* Test that a message with no body section, but with the content type set
to
- * {@value AmqpObjectMessage#CONTENT_TYPE} results in a object message
+ * {@value AmqpObjectMessageSerializedDelegate#CONTENT_TYPE} results in a
object message
*/
@Test
public void testCreateAmqpObjectMessageFromNoBodySectionAndContentType()
throws Exception
{
//TODO: this test only required if we decide that not sending a content
body is legal
Message message = Proton.message();
- message.setContentType(AmqpSerializedObjectMessage.CONTENT_TYPE);
+
message.setContentType(AmqpObjectMessageSerializedDelegate.CONTENT_TYPE);
AmqpMessage amqpMessage =
_amqpMessageFactory.createAmqpMessage(_mockDelivery, message,
_mockAmqpConnection);
- assertEquals(AmqpSerializedObjectMessage.class, amqpMessage.getClass());
+ assertEquals(AmqpObjectMessage.class, amqpMessage.getClass());
}
/**
@@ -172,7 +172,7 @@ public class AmqpMessageFactoryTest exte
/**
* Test that a data body containing nothing, but with the content type set
to
- * {@value AmqpObjectMessage#CONTENT_TYPE} results in a object message
+ * {@value AmqpObjectMessageSerializedDelegate#CONTENT_TYPE} results in a
object message
*/
@Test
public void
testCreateAmqpObjectMessageFromDataWithEmptyBinaryAndContentType() throws
Exception
@@ -180,10 +180,10 @@ public class AmqpMessageFactoryTest exte
Message message = Proton.message();
Binary binary = new Binary(new byte[0]);
message.setBody(new Data(binary));
- message.setContentType(AmqpSerializedObjectMessage.CONTENT_TYPE);
+
message.setContentType(AmqpObjectMessageSerializedDelegate.CONTENT_TYPE);
AmqpMessage amqpMessage =
_amqpMessageFactory.createAmqpMessage(_mockDelivery, message,
_mockAmqpConnection);
- assertEquals(AmqpSerializedObjectMessage.class, amqpMessage.getClass());
+ assertEquals(AmqpObjectMessage.class, amqpMessage.getClass());
}
/**
Copied:
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpObjectMessageTest.java
(from r1593931,
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpSerializedObjectMessageTest.java)
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpObjectMessageTest.java?p2=qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpObjectMessageTest.java&p1=qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpSerializedObjectMessageTest.java&r1=1593931&r2=1593932&rev=1593932&view=diff
==============================================================================
---
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpSerializedObjectMessageTest.java
(original)
+++
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpObjectMessageTest.java
Mon May 12 11:40:17 2014
@@ -46,7 +46,7 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
-public class AmqpSerializedObjectMessageTest extends QpidJmsTestCase
+public class AmqpObjectMessageTest extends QpidJmsTestCase
{
private AmqpConnection _mockAmqpConnection;
private Delivery _mockDelivery;
@@ -63,7 +63,7 @@ public class AmqpSerializedObjectMessage
@Test
public void testGetObjectWithNewMessageToSendReturnsNull() throws Exception
{
- AmqpSerializedObjectMessage amqpSerializedObjectMessage = new
AmqpSerializedObjectMessage();
+ AmqpObjectMessage amqpSerializedObjectMessage = new
AmqpObjectMessage();
assertNull("Expected null object initially",
amqpSerializedObjectMessage.getObject());
}
@@ -72,7 +72,7 @@ public class AmqpSerializedObjectMessage
public void
testGetObjectUsingReceivedMessageWithNoBodySectionReturnsNull() throws Exception
{
Message message = Proton.message();
- AmqpSerializedObjectMessage amqpSerializedObjectMessage = new
AmqpSerializedObjectMessage(_mockDelivery, message, _mockAmqpConnection);
+ AmqpObjectMessage amqpSerializedObjectMessage = new
AmqpObjectMessage(_mockDelivery, message, _mockAmqpConnection, false);
assertNull("Expected null object",
amqpSerializedObjectMessage.getObject());
}
@@ -83,7 +83,7 @@ public class AmqpSerializedObjectMessage
Message message = Proton.message();
message.setBody(new Data(null));
- AmqpSerializedObjectMessage amqpSerializedObjectMessage = new
AmqpSerializedObjectMessage(_mockDelivery, message, _mockAmqpConnection);
+ AmqpObjectMessage amqpSerializedObjectMessage = new
AmqpObjectMessage(_mockDelivery, message, _mockAmqpConnection, false);
assertNull("Expected null object",
amqpSerializedObjectMessage.getObject());
}
@@ -94,7 +94,7 @@ public class AmqpSerializedObjectMessage
Message message = Proton.message();
message.setBody(new AmqpValue("doesntMatter"));
- AmqpSerializedObjectMessage amqpSerializedObjectMessage = new
AmqpSerializedObjectMessage(_mockDelivery, message, _mockAmqpConnection);
+ AmqpObjectMessage amqpSerializedObjectMessage = new
AmqpObjectMessage(_mockDelivery, message, _mockAmqpConnection, false);
try
{
@@ -122,7 +122,7 @@ public class AmqpSerializedObjectMessage
oos.close();
byte[] bytes = baos.toByteArray();
- AmqpSerializedObjectMessage amqpSerializedObjectMessage = new
AmqpSerializedObjectMessage();
+ AmqpObjectMessage amqpSerializedObjectMessage = new
AmqpObjectMessage();
amqpSerializedObjectMessage.setObject(content);
Message protonMessage = amqpSerializedObjectMessage.getMessage();
@@ -142,7 +142,7 @@ public class AmqpSerializedObjectMessage
Message message = Proton.message();
message.setBody(new Data(new Binary(new byte[0])));
- AmqpSerializedObjectMessage amqpSerializedObjectMessage = new
AmqpSerializedObjectMessage(_mockDelivery, message, _mockAmqpConnection);
+ AmqpObjectMessage amqpSerializedObjectMessage = new
AmqpObjectMessage(_mockDelivery, message, _mockAmqpConnection, false);
assertNotNull("Expected existing body section to be found",
message.getBody());
amqpSerializedObjectMessage.setObject(null);
@@ -160,7 +160,7 @@ public class AmqpSerializedObjectMessage
Map<String,String> origMap = new HashMap<String,String>();
origMap.put("key1", "value1");
- AmqpSerializedObjectMessage amqpSerializedObjectMessage = new
AmqpSerializedObjectMessage();
+ AmqpObjectMessage amqpSerializedObjectMessage = new
AmqpObjectMessage();
amqpSerializedObjectMessage.setObject((Serializable) origMap);
//verify we get a different-but-equal object back
Modified:
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ObjectMessageImplTest.java
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ObjectMessageImplTest.java?rev=1593932&r1=1593931&r2=1593932&view=diff
==============================================================================
---
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ObjectMessageImplTest.java
(original)
+++
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ObjectMessageImplTest.java
Mon May 12 11:40:17 2014
@@ -42,7 +42,7 @@ import javax.jms.ObjectMessage;
import org.apache.qpid.jms.QpidJmsTestCase;
import org.apache.qpid.jms.engine.AmqpConnection;
import org.apache.qpid.jms.engine.AmqpObjectMessage;
-import org.apache.qpid.jms.engine.AmqpSerializedObjectMessage;
+import org.apache.qpid.jms.engine.AmqpObjectMessage;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.Data;
@@ -88,7 +88,7 @@ public class ObjectMessageImplTest exten
Message message = Proton.message();
message.setBody(new Data(new Binary(bytes)));
- AmqpSerializedObjectMessage amqpSerializedObjectMessage = new
AmqpSerializedObjectMessage(_mockDelivery, message, _mockAmqpConnection);
+ AmqpObjectMessage amqpSerializedObjectMessage = new
AmqpObjectMessage(_mockDelivery, message, _mockAmqpConnection, false);
ObjectMessageImpl objectMessageImpl = new
ObjectMessageImpl(amqpSerializedObjectMessage,
_mockSessionImpl,_mockConnectionImpl, null);
try
@@ -120,7 +120,7 @@ public class ObjectMessageImplTest exten
Message message = Proton.message();
message.setBody(new Data(new Binary(bytes)));
- AmqpSerializedObjectMessage amqpSerializedObjectMessage = new
AmqpSerializedObjectMessage(_mockDelivery, message, _mockAmqpConnection);
+ AmqpObjectMessage amqpSerializedObjectMessage = new
AmqpObjectMessage(_mockDelivery, message, _mockAmqpConnection, false);
ObjectMessageImpl objectMessageImpl = new
ObjectMessageImpl(amqpSerializedObjectMessage,
_mockSessionImpl,_mockConnectionImpl, null);
assertFalse("Message should not be writable",
objectMessageImpl.isBodyWritable());
@@ -148,7 +148,7 @@ public class ObjectMessageImplTest exten
Message message = Proton.message();
message.setBody(new Data(new Binary(bytes)));
- AmqpSerializedObjectMessage amqpSerializedObjectMessage = new
AmqpSerializedObjectMessage(_mockDelivery, message, _mockAmqpConnection);
+ AmqpObjectMessage amqpSerializedObjectMessage = new
AmqpObjectMessage(_mockDelivery, message, _mockAmqpConnection, false);
ObjectMessageImpl objectMessageImpl = new
ObjectMessageImpl(amqpSerializedObjectMessage,
_mockSessionImpl,_mockConnectionImpl, null);
assertNotNull("Expected body section but none was present",
message.getBody());
@@ -242,7 +242,7 @@ public class ObjectMessageImplTest exten
origMap.put("key1", "value1");
origMap.put("notSerializable", new NotSerializable());
- AmqpObjectMessage amqpSerializedObjectMessage =
Mockito.mock(AmqpSerializedObjectMessage.class);
+ AmqpObjectMessage amqpSerializedObjectMessage =
Mockito.mock(AmqpObjectMessage.class);
Mockito.when(amqpSerializedObjectMessage.getObject()).thenThrow(new
ClassNotFoundException());
ObjectMessageImpl objectMessageImpl = new
ObjectMessageImpl(amqpSerializedObjectMessage,
_mockSessionImpl,_mockConnectionImpl, null);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]