Author: dkulp
Date: Tue Jul 28 19:01:30 2009
New Revision: 798654
URL: http://svn.apache.org/viewvc?rev=798654&view=rev
Log:
[CXF-2366] Patch from Marat Bedretdinov applied
Modified:
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConstants.java
cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
Modified:
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java?rev=798654&r1=798653&r2=798654&view=diff
==============================================================================
---
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
(original)
+++
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
Tue Jul 28 19:01:30 2009
@@ -164,13 +164,15 @@
String messageType = jmsConfig.getMessageType();
final javax.jms.Message jmsMessage;
Destination replyToDestination = replyTo;
- if (exchange.isOneWay() && !jmsConfig.isEnforceSpec()) {
- final String contextReplyToName =
- (headers != null) ? headers.getJMSReplyTo() : null;
- if (contextReplyToName != null) {
+ if (exchange.isOneWay() && !jmsConfig.isEnforceSpec() &&
isSetReplyTo(outMessage)) {
+ String replyToName = (headers != null) ?
headers.getJMSReplyTo() : null;
+ if (replyToName == null && jmsConfig.getReplyDestination()
!= null) {
+ replyToName = jmsConfig.getReplyDestination();
+ }
+ if (replyToName != null) {
replyToDestination =
JMSFactory.resolveOrCreateDestination(jmsTemplate,
-
contextReplyToName,
+ replyToName,
jmsConfig.isPubSubDomain());
}
}
@@ -321,6 +323,11 @@
this.jmsConfig = jmsConfig;
}
+ protected static boolean isSetReplyTo(Message message) {
+ Boolean ret = (Boolean)message.get(JMSConstants.JMS_SET_REPLY_TO);
+ return ret == null || (ret != null && ret.booleanValue());
+ }
+
@Override
protected void finalize() throws Throwable {
if (listener != null) {
Modified:
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConstants.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConstants.java?rev=798654&r1=798653&r2=798654&view=diff
==============================================================================
---
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConstants.java
(original)
+++
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConstants.java
Tue Jul 28 19:01:30 2009
@@ -55,6 +55,7 @@
public static final String JMS_SERVER_CONFIG_ID = "jms-server";
public static final String JMS_REBASED_REPLY_TO =
"org.apache.cxf.jms.server.replyto";
+ public static final String JMS_SET_REPLY_TO =
"org.apache.cxf.jms.client.set.replyto";
private JMSConstants() {
Modified:
cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java?rev=798654&r1=798653&r2=798654&view=diff
==============================================================================
---
cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
(original)
+++
cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
Tue Jul 28 19:01:30 2009
@@ -24,6 +24,9 @@
import java.io.InputStream;
import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Queue;
+import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.cxf.BusFactory;
@@ -44,6 +47,10 @@
private static final int MAX_RECEIVE_TIME = 5;
private Message destMessage;
+ public JMSDestinationTest() {
+
+ }
+
@BeforeClass
public static void createAndStartBroker() throws Exception {
startBroker(new JMSBrokerSetup("tcp://localhost:61500"));
@@ -173,14 +180,12 @@
bus = bf.createBus("jms_test_config.xml");
BusFactory.setDefaultBus(bus);
destMessage = null;
- inMessage = null;
setupServiceInfo("http://cxf.apache.org/hello_world_jms",
"/wsdl/jms_test.wsdl",
"HelloWorldPubSubService", "HelloWorldPubSubPort");
JMSConduit conduit = setupJMSConduit(true, false);
Message outMessage = new MessageImpl();
setupMessageHeader(outMessage);
JMSDestination destination = setupJMSDestination(true);
- destination.activate();
sendoutMessage(conduit, outMessage, true);
// wait for the message to be get from the destination
waitForReceiveDestMessage();
@@ -194,15 +199,12 @@
@Test
public void testOneWayDestination() throws Exception {
- destMessage = null;
- inMessage = null;
setupServiceInfo("http://cxf.apache.org/hello_world_jms",
"/wsdl/jms_test.wsdl",
"HWStaticReplyQBinMsgService",
"HWStaticReplyQBinMsgPort");
JMSConduit conduit = setupJMSConduit(true, false);
Message outMessage = new MessageImpl();
setupMessageHeader(outMessage);
JMSDestination destination = setupJMSDestination(true);
- destination.activate();
sendoutMessage(conduit, outMessage, true);
// wait for the message to be get from the destination
waitForReceiveDestMessage();
@@ -214,22 +216,128 @@
destination.shutdown();
}
- private void setupMessageHeader(Message outMessage, String correlationId) {
+ @Test
+ public void testOneWayReplyToSetUnset() throws Exception {
+ /* 1. Test that replyTo destination set in WSDL is NOT used
+ * in spec compliant mode */
+
+ destMessage = null;
+ setupServiceInfo("http://cxf.apache.org/hello_world_jms",
"/wsdl/jms_test.wsdl",
+ "HWStaticReplyQBinMsgService",
"HWStaticReplyQBinMsgPort");
+ JMSConduit conduit = setupJMSConduit(true, false);
+ Message outMessage = new MessageImpl();
+ setupMessageHeader(outMessage);
+ JMSDestination destination = setupJMSDestination(true);
+ sendoutMessage(conduit, outMessage, true);
+ // wait for the message to be get from the destination
+ waitForReceiveDestMessage();
+ // just verify the Destination inMessage
+ assertTrue("The destiantion should have got the message ", destMessage
!= null);
+ verifyReplyToNotSet(destMessage);
+ destMessage = null;
+
+ /* 2. Test that replyTo destination set in WSDL IS used
+ * in spec non-compliant mode */
+
+ conduit.getJmsConfig().setEnforceSpec(false);
+ sendoutMessage(conduit, outMessage, true);
+ waitForReceiveDestMessage();
+ assertTrue("The destiantion should have got the message ", destMessage
!= null);
+ String exName = conduit.getJmsConfig().getReplyDestination();
+ exName = (exName.indexOf('/') != -1 && exName.indexOf('/') <
exName.length())
+ ? exName.substring(exName.indexOf('/') + 1) : exName;
+ verifyReplyToSet(destMessage, Queue.class, exName);
+ destMessage = null;
+
+ /* 3. Test that replyTo destination provided via invocation context
+ * overrides the value set in WSDL and IS used in spec non-compliant
mode */
+
+ String contextReplyTo = conduit.getJmsConfig().getReplyDestination() +
".context";
+ exName += ".context";
+ setupMessageHeader(outMessage, "cidValue", contextReplyTo);
+ sendoutMessage(conduit, outMessage, true);
+ waitForReceiveDestMessage();
+ assertTrue("The destiantion should have got the message ", destMessage
!= null);
+ verifyReplyToSet(destMessage, Queue.class, exName);
+ destMessage = null;
+
+ /* 4. Test that replyTo destination provided via invocation context
+ * and the value set in WSDL are NOT used in spec non-compliant mode
+ * when JMSConstants.JMS_SET_REPLY_TO == false */
+
+ setupMessageHeader(outMessage);
+ outMessage.put(JMSConstants.JMS_SET_REPLY_TO, Boolean.FALSE);
+ sendoutMessage(conduit, outMessage, true);
+ waitForReceiveDestMessage();
+ assertTrue("The destiantion should have got the message ", destMessage
!= null);
+ verifyReplyToNotSet(destMessage);
+ destMessage = null;
+
+ /* 5. Test that replyTo destination set in WSDL IS used in spec
non-compliant
+ * mode when JMSConstants.JMS_SET_REPLY_TO == true */
+
+ setupMessageHeader(outMessage);
+ outMessage.put(JMSConstants.JMS_SET_REPLY_TO, Boolean.TRUE);
+ sendoutMessage(conduit, outMessage, true);
+ waitForReceiveDestMessage();
+ assertTrue("The destiantion should have got the message ", destMessage
!= null);
+ exName = conduit.getJmsConfig().getReplyDestination();
+ exName = (exName.indexOf('/') != -1 && exName.indexOf('/') <
exName.length())
+ ? exName.substring(exName.indexOf('/') + 1) : exName;
+ verifyReplyToSet(destMessage, Queue.class, exName);
+ destMessage = null;
+
+ conduit.close();
+ destination.shutdown();
+ }
+
+
+ protected void verifyReplyToNotSet(Message cxfMsg) {
+ javax.jms.Message jmsMsg =
+
javax.jms.Message.class.cast(cxfMsg.get(JMSConstants.JMS_REQUEST_MESSAGE));
+ assertNotNull("JMS Messsage must be null", jmsMsg);
+ }
+
+ protected void verifyReplyToSet(Message cxfMsg,
+ Class<? extends Destination> type,
+ String name) throws Exception {
+ javax.jms.Message jmsMsg =
+
javax.jms.Message.class.cast(cxfMsg.get(JMSConstants.JMS_REQUEST_MESSAGE));
+ assertNotNull("JMS Messsage must not be null", jmsMsg);
+ assertNotNull("JMS Messsage's replyTo must not be null",
jmsMsg.getJMSReplyTo());
+ assertTrue("JMS Messsage's replyTo type must be of type " +
type.getName(),
+ type.isAssignableFrom(jmsMsg.getJMSReplyTo().getClass()));
+ String receivedName = null;
+ if (type == Queue.class) {
+ receivedName = ((Queue)jmsMsg.getJMSReplyTo()).getQueueName();
+ } else if (type == Topic.class) {
+ receivedName = ((Topic)jmsMsg.getJMSReplyTo()).getTopicName();
+ }
+ assertTrue("JMS Messsage's replyTo must be named " + name + " but was
" + receivedName,
+ name == receivedName || receivedName.equals(name));
+
+ }
+ private void setupMessageHeader(Message outMessage, String correlationId,
String replyTo) {
JMSMessageHeadersType header = new JMSMessageHeadersType();
header.setJMSCorrelationID(correlationId);
header.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
header.setJMSPriority(1);
header.setTimeToLive(1000);
+ header.setJMSReplyTo(replyTo != null ? replyTo : null);
outMessage.put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS, header);
outMessage.put(Message.ENCODING, "US-ASCII");
}
-
+
private void setupMessageHeader(Message outMessage) {
- setupMessageHeader(outMessage, "Destination test");
+ setupMessageHeader(outMessage, "Destination test", null);
+ }
+
+ private void setupMessageHeader(Message outMessage, String correlationId) {
+ setupMessageHeader(outMessage, correlationId, null);
}
- private void verifyReceivedMessage(Message inMessage) {
- ByteArrayInputStream bis =
(ByteArrayInputStream)inMessage.getContent(InputStream.class);
+ private void verifyReceivedMessage(Message message) {
+ ByteArrayInputStream bis =
(ByteArrayInputStream)message.getContent(InputStream.class);
byte bytes[] = new byte[bis.available()];
try {
bis.read(bytes);
@@ -241,26 +349,26 @@
assertEquals("The response content should be equal",
AbstractJMSTester.MESSAGE_CONTENT, response);
}
- private void verifyRequestResponseHeaders(Message inMessage, Message
outMessage) {
- JMSMessageHeadersType outHeader = (JMSMessageHeadersType)outMessage
+ private void verifyRequestResponseHeaders(Message msgIn, Message msgOut) {
+ JMSMessageHeadersType outHeader = (JMSMessageHeadersType)msgOut
.get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
- String inEncoding = (String) inMessage.get(Message.ENCODING);
- String outEncoding = (String) outMessage.get(Message.ENCODING);
+ String inEncoding = (String) msgIn.get(Message.ENCODING);
+ String outEncoding = (String) msgOut.get(Message.ENCODING);
assertEquals("The message encoding should be equal", inEncoding,
outEncoding);
- JMSMessageHeadersType inHeader = (JMSMessageHeadersType)inMessage
+ JMSMessageHeadersType inHeader = (JMSMessageHeadersType)msgIn
.get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
verifyJmsHeaderEquality(outHeader, inHeader);
}
- private void verifyHeaders(Message inMessage, Message outMessage) {
- JMSMessageHeadersType outHeader = (JMSMessageHeadersType)outMessage
+ private void verifyHeaders(Message msgIn, Message msgOut) {
+ JMSMessageHeadersType outHeader = (JMSMessageHeadersType)msgOut
.get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
- JMSMessageHeadersType inHeader = (JMSMessageHeadersType)inMessage
+ JMSMessageHeadersType inHeader = (JMSMessageHeadersType)msgIn
.get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
verifyJmsHeaderEquality(outHeader, inHeader);
@@ -286,14 +394,13 @@
@Test
public void testRoundTripDestination() throws Exception {
- inMessage = null;
setupServiceInfo("http://cxf.apache.org/hello_world_jms",
"/wsdl/jms_test.wsdl",
"HelloWorldService", "HelloWorldPort");
// set up the conduit send to be true
JMSConduit conduit = setupJMSConduit(true, false);
final Message outMessage = new MessageImpl();
setupMessageHeader(outMessage, null);
- final JMSDestination destination = setupJMSDestination(true);
+ final JMSDestination destination = setupJMSDestination(false);
// set up MessageObserver for handling the conduit message
MessageObserver observer = new MessageObserver() {
@@ -342,7 +449,6 @@
final String customPropertyName =
"THIS_PROPERTY_WILL_NOT_BE_AUTO_COPIED";
- inMessage = null;
setupServiceInfo("http://cxf.apache.org/hello_world_jms",
"/wsdl/jms_test.wsdl",
"HelloWorldService", "HelloWorldPort");
// set up the conduit send to be true
@@ -358,7 +464,7 @@
.get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
headers.getProperty().add(excludeProp);
- final JMSDestination destination = setupJMSDestination(true);
+ final JMSDestination destination = setupJMSDestination(false);
// set up MessageObserver for handling the conduit message
MessageObserver observer = new MessageObserver() {
@@ -407,7 +513,6 @@
@Test
public void testIsMultiplexCapable() throws Exception {
- inMessage = null;
setupServiceInfo("http://cxf.apache.org/hello_world_jms",
"/wsdl/jms_test.wsdl",
"HelloWorldService", "HelloWorldPort");
final JMSDestination destination = setupJMSDestination(true);
@@ -417,7 +522,6 @@
@Test
public void testSecurityContext() throws Exception {
- inMessage = null;
setupServiceInfo("http://cxf.apache.org/hello_world_jms",
"/wsdl/jms_test.wsdl",
"HelloWorldService", "HelloWorldPort");
final JMSDestination destination = setupJMSDestination(true);