Author: dkulp
Date: Sat Dec 19 13:06:05 2009
New Revision: 892469
URL: http://svn.apache.org/viewvc?rev=892469&view=rev
Log:
Restore Async methods over JMS to actually be async and not consume a
bunch of threads.
Added:
cxf/trunk/testutils/src/main/resources/wsdl/jms_test-binding.xml (with
props)
Modified:
cxf/trunk/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JAXWSMethodDispatcher.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java
cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/JMSClientServerTest.java
cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/TwoWayJMSImplBase.java
cxf/trunk/testutils/src/main/resources/wsdl/jms_test.wsdl
Modified:
cxf/trunk/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JAXWSMethodDispatcher.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JAXWSMethodDispatcher.java?rev=892469&r1=892468&r2=892469&view=diff
==============================================================================
---
cxf/trunk/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JAXWSMethodDispatcher.java
(original)
+++
cxf/trunk/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JAXWSMethodDispatcher.java
Sat Dec 19 13:06:05 2009
@@ -23,8 +23,11 @@
import java.lang.reflect.Modifier;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
+import java.util.concurrent.Future;
import java.util.logging.Logger;
+import javax.xml.ws.Response;
+
import org.apache.cxf.common.i18n.Message;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.endpoint.Endpoint;
@@ -49,8 +52,16 @@
int i = 0;
for (Method m : methods) {
try {
- newMethods[i++] = getImplementationMethod(m);
+ newMethods[i] = getImplementationMethod(m);
+ i++;
} catch (NoSuchMethodException e) {
+ if (m.getName().endsWith("Async")
+ && (Future.class.equals(m.getReturnType())
+ || Response.class.equals(m.getReturnType()))) {
+ newMethods[i] = m;
+ i++;
+ continue;
+ }
Class endpointClass = implInfo.getImplementorClass();
Message msg = new Message("SEI_METHOD_NOT_FOUND", LOG,
m.getName(),
endpointClass.getName());
Modified:
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java?rev=892469&r1=892468&r2=892469&view=diff
==============================================================================
---
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
(original)
+++
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
Sat Dec 19 13:06:05 2009
@@ -22,15 +22,22 @@
import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
+import java.lang.ref.WeakReference;
+import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.Destination;
import javax.jms.JMSException;
+import javax.jms.MessageListener;
import javax.jms.Session;
+import org.apache.cxf.Bus;
+import org.apache.cxf.buslifecycle.BusLifeCycleListener;
+import org.apache.cxf.buslifecycle.BusLifeCycleManager;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.configuration.ConfigurationException;
import org.apache.cxf.message.Exchange;
@@ -39,9 +46,13 @@
import org.apache.cxf.message.MessageUtils;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.AbstractConduit;
+import org.apache.cxf.workqueue.WorkQueueManager;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.listener.AbstractMessageListenerContainer;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.jms.support.JmsUtils;
/**
* JMSConduit is instantiated by the JMSTransportfactory which is selected by
a client if the transport
@@ -49,7 +60,7 @@
* a JMS destination. If the Exchange is not oneway it then recevies the
response and converts it to a CXF
* Message. This is then provided in the Exchange and also sent to the
incomingObserver
*/
-public class JMSConduit extends AbstractConduit implements JMSExchangeSender {
+public class JMSConduit extends AbstractConduit implements JMSExchangeSender,
MessageListener {
static final Logger LOG = LogUtils.getL7dLogger(JMSConduit.class);
@@ -57,11 +68,21 @@
private EndpointInfo endpointInfo;
private JMSConfiguration jmsConfig;
+ private Map<String, Exchange> correlationMap = new
ConcurrentHashMap<String, Exchange>();
+ private DefaultMessageListenerContainer jmsListener;
+ private DefaultMessageListenerContainer allListener;
private String conduitId;
private AtomicLong messageCount;
-
- public JMSConduit(EndpointInfo endpointInfo, EndpointReferenceType target,
JMSConfiguration jmsConfig) {
+ private int outstandingAsync;
+ private JMSBusLifeCycleListener listener;
+ private Bus bus;
+
+ public JMSConduit(EndpointInfo endpointInfo,
+ EndpointReferenceType target,
+ JMSConfiguration jmsConfig,
+ Bus b) {
super(target);
+ bus = b;
this.jmsConfig = jmsConfig;
this.endpointInfo = endpointInfo;
conduitId = UUID.randomUUID().toString().replaceAll("-", "");
@@ -82,6 +103,31 @@
JMSOutputStream out = new JMSOutputStream(this, message.getExchange(),
isTextPayload);
message.setContent(OutputStream.class, out);
}
+
+ private synchronized AbstractMessageListenerContainer getJMSListener() {
+ if (jmsListener == null) {
+ jmsListener = JMSFactory.createJmsListener(jmsConfig,
+ this,
+
jmsConfig.getReplyDestination(),
+ conduitId,
+ false);
+ addBusListener();
+ }
+ ++outstandingAsync;
+ return jmsListener;
+ }
+ private synchronized AbstractMessageListenerContainer getAllListener() {
+ if (allListener == null) {
+ allListener = JMSFactory.createJmsListener(jmsConfig,
+ this,
+
jmsConfig.getReplyDestination(),
+ null,
+ true);
+ addBusListener();
+ }
+ ++outstandingAsync;
+ return allListener;
+ }
/**
* Send the JMS Request out and if not oneWay receive the response
@@ -126,8 +172,9 @@
if (userCID != null) {
correlationId = userCID;
} else if (!jmsConfig.isSetConduitSelectorPrefix()
- && (!jmsConfig.isSetUseConduitIdSelector() || !jmsConfig
- .isUseConduitIdSelector())) {
+ && (exchange.isSynchronous() || exchange.isOneWay())
+ && (!jmsConfig.isSetUseConduitIdSelector()
+ || !jmsConfig.isUseConduitIdSelector())) {
messageIdPattern = true;
} else {
if (jmsConfig.isUseConduitIdSelector()) {
@@ -145,10 +192,18 @@
Destination replyToDestination = null;
if (!exchange.isOneWay() || !jmsConfig.isEnforceSpec() &&
isSetReplyTo(outMessage)
&& replyTo != null) {
- replyToDestination =
JMSFactory.resolveOrCreateDestination(jmsTemplate, replyTo,
-
jmsConfig.isPubSubDomain());
+ if (exchange.isSynchronous() || exchange.isOneWay()) {
+ replyToDestination =
JMSFactory.resolveOrCreateDestination(jmsTemplate, replyTo,
+
jmsConfig.isPubSubDomain());
+ } else {
+ if (userCID == null || !jmsConfig.isUseConduitIdSelector()) {
+ replyToDestination = getJMSListener().getDestination();
+ } else {
+ replyToDestination = getAllListener().getDestination();
+ }
+ }
}
-
+
final String cid = correlationId;
final Destination rtd = replyToDestination;
class JMSConduitMessageCreator implements MessageCreator {
@@ -159,6 +214,9 @@
jmsMessage = JMSUtils.buildJMSMessageFromCXFMessage(jmsConfig,
outMessage, request,
messageType, session, rtd,
cid);
+ if (!exchange.isSynchronous() && !exchange.isOneWay()) {
+ correlationMap.put(cid, exchange);
+ }
LOG.log(Level.FINE, "client sending request: ", jmsMessage);
return jmsMessage;
}
@@ -186,15 +244,17 @@
correlationId = messageCreator.getMessageID();
}
headers.setJMSMessageID(messageCreator.getMessageID());
-
- String messageSelector = "JMSCorrelationID = '" +
correlationId + "'";
- javax.jms.Message replyMessage =
jmsTemplate.receiveSelected(replyToDestination,
-
messageSelector);
- if (replyMessage == null) {
- throw new RuntimeException("Timeout receiving message with
correlationId "
- + correlationId);
- } else {
- doReplyMessage(exchange, replyMessage);
+
+ final String messageSelector = "JMSCorrelationID = '" +
correlationId + "'";
+ if (exchange.isSynchronous()) {
+ javax.jms.Message replyMessage =
jmsTemplate.receiveSelected(replyToDestination,
+
messageSelector);
+ if (replyMessage == null) {
+ throw new RuntimeException("Timeout receiving message
with correlationId "
+ + correlationId);
+ } else {
+ doReplyMessage(exchange, replyMessage);
+ }
}
}
} else {
@@ -203,6 +263,85 @@
}
}
+ static class JMSBusLifeCycleListener implements BusLifeCycleListener {
+ final WeakReference<JMSConduit> ref;
+ BusLifeCycleManager blcm;
+ JMSBusLifeCycleListener(JMSConduit c, BusLifeCycleManager b) {
+ ref = new WeakReference<JMSConduit>(c);
+ blcm = b;
+ blcm.registerLifeCycleListener(this);
+ }
+
+ public void initComplete() {
+ }
+
+ public void postShutdown() {
+ }
+
+ public void preShutdown() {
+ unreg();
+ blcm = null;
+ JMSConduit c = ref.get();
+ if (c != null) {
+ c.listener = null;
+ c.close();
+ }
+ }
+ public void unreg() {
+ if (blcm != null) {
+ blcm.unregisterLifeCycleListener(this);
+ }
+ }
+ }
+ private synchronized void addBusListener() {
+ if (listener == null && bus != null) {
+ BusLifeCycleManager blcm =
bus.getExtension(BusLifeCycleManager.class);
+ if (blcm != null) {
+ listener = new JMSBusLifeCycleListener(this,
+ blcm);
+ }
+ }
+ }
+
+ /**
+ * When a message is received on the reply destination the correlation map
is searched for the
+ * correlationId. If it is found the message is converted to a CXF message
and the thread sending the
+ * request is notified {...@inheritdoc}
+ */
+ public void onMessage(javax.jms.Message jmsMessage) {
+ String correlationId;
+ try {
+ correlationId = jmsMessage.getJMSCorrelationID();
+ } catch (JMSException e) {
+ throw JmsUtils.convertJmsAccessException(e);
+ }
+
+ Exchange exchange = correlationMap.remove(correlationId);
+ if (exchange == null) {
+ LOG.log(Level.WARNING, "Could not correlate message with
correlationId " + correlationId);
+ return;
+ }
+ doReplyMessage(exchange, jmsMessage);
+ maybeShutdownListeners();
+ }
+ private synchronized void maybeShutdownListenersInternal() {
+ if (outstandingAsync == 0) {
+ shutdownListeners();
+ }
+ }
+ private synchronized void maybeShutdownListeners() {
+ if (outstandingAsync > 0) {
+ --outstandingAsync;
+ }
+ if (outstandingAsync == 0) {
+
bus.getExtension(WorkQueueManager.class).getAutomaticWorkQueue().execute(new
Runnable() {
+ public void run() {
+ maybeShutdownListenersInternal();
+ }
+ });
+ }
+ }
+
/**
* Here we just deal with the reply message
*/
@@ -222,7 +361,6 @@
}
}
- //REVISIT: put on a workqueue?
if (incomingObserver != null) {
incomingObserver.onMessage(exchange.getInMessage());
}
@@ -231,7 +369,22 @@
}
}
- public void close() {
+ public synchronized void shutdownListeners() {
+ if (listener != null) {
+ listener.unreg();
+ listener = null;
+ }
+ if (jmsListener != null) {
+ jmsListener.shutdown();
+ jmsListener = null;
+ }
+ if (allListener != null) {
+ allListener.shutdown();
+ allListener = null;
+ }
+ }
+ public synchronized void close() {
+ shutdownListeners();
jmsConfig.destroyWrappedConnectionFactory();
LOG.log(Level.FINE, "JMSConduit closed ");
}
@@ -252,10 +405,11 @@
Boolean ret = (Boolean)message.get(JMSConstants.JMS_SET_REPLY_TO);
return ret == null || (ret != null && ret.booleanValue());
}
-
+
@Override
protected void finalize() throws Throwable {
close();
super.finalize();
}
+
}
\ No newline at end of file
Modified:
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java?rev=892469&r1=892468&r2=892469&view=diff
==============================================================================
---
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
(original)
+++
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
Sat Dec 19 13:06:05 2009
@@ -160,14 +160,59 @@
return createJmsListener(jmsListener,
jmsConfig,
listenerHandler,
- destinationName);
+ destinationName,
+ null, null, false);
+ }
+ /**
+ * Create and start listener using configuration information from
jmsConfig. Uses
+ * resolveOrCreateDestination to determine the destination for the
listener.
+ *
+ * @param jmsConfig configuration information
+ * @param listenerHandler object to be called when a message arrives
+ * @param destinationName null for temp dest or a destination name
+ * @param messageSelectorPrefix prefix for the messageselector
+ * @return
+ */
+ public static DefaultMessageListenerContainer
createJmsListener(JMSConfiguration jmsConfig,
+
MessageListener listenerHandler,
+
Destination destination,
+ String
messageSelectorPrefix,
+ boolean
userCID) {
+ DefaultMessageListenerContainer jmsListener = jmsConfig.isUseJms11()
+ ? new DefaultMessageListenerContainer() : new
DefaultMessageListenerContainer102();
+
+ return createJmsListener(jmsListener,
+ jmsConfig,
+ listenerHandler,
+ null,
+ destination,
+ messageSelectorPrefix,
+ userCID);
+ }
+ public static DefaultMessageListenerContainer
createJmsListener(JMSConfiguration jmsConfig,
+
MessageListener listenerHandler,
+ String
destination,
+ String
messageSelectorPrefix,
+ boolean
userCID) {
+ DefaultMessageListenerContainer jmsListener = jmsConfig.isUseJms11()
+ ? new DefaultMessageListenerContainer() : new
DefaultMessageListenerContainer102();
+
+ return createJmsListener(jmsListener,
+ jmsConfig,
+ listenerHandler,
+ destination,
+ null,
+ messageSelectorPrefix,
+ userCID);
}
-
public static DefaultMessageListenerContainer createJmsListener(
DefaultMessageListenerContainer jmsListener,
JMSConfiguration jmsConfig,
MessageListener listenerHandler,
- String destinationName) {
+ String destinationName,
+ Destination destination,
+ String messageSelectorPrefix,
+ boolean userCID) {
jmsListener.setConcurrentConsumers(jmsConfig.getConcurrentConsumers());
jmsListener.setMaxConcurrentConsumers(jmsConfig.getMaxConcurrentConsumers());
@@ -202,18 +247,24 @@
jmsListener.setAcceptMessagesWhileStopping(jmsConfig.isAcceptMessagesWhileStopping());
}
String staticSelectorPrefix = jmsConfig.getConduitSelectorPrefix();
- if (staticSelectorPrefix.length() > 0) {
+ if (!userCID && messageSelectorPrefix != null &&
jmsConfig.isUseConduitIdSelector()) {
+ jmsListener.setMessageSelector("JMSCorrelationID LIKE '"
+ + staticSelectorPrefix
+ + messageSelectorPrefix + "%'");
+ } else if (staticSelectorPrefix.length() > 0) {
jmsListener.setMessageSelector("JMSCorrelationID LIKE '"
+ staticSelectorPrefix + "%'");
}
+
if (jmsConfig.getDestinationResolver() != null) {
jmsListener.setDestinationResolver(jmsConfig.getDestinationResolver());
}
if (jmsConfig.getTaskExecutor() != null) {
jmsListener.setTaskExecutor(jmsConfig.getTaskExecutor());
}
-
- if (jmsConfig.isAutoResolveDestination()) {
+ if (destination != null) {
+ jmsListener.setDestination(destination);
+ } else if (jmsConfig.isAutoResolveDestination()) {
jmsListener.setDestinationName(destinationName);
} else {
JmsTemplate jmsTemplate = createJmsTemplate(jmsConfig, null);
Modified:
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java?rev=892469&r1=892468&r2=892469&view=diff
==============================================================================
---
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java
(original)
+++
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java
Sat Dec 19 13:06:05 2009
@@ -68,7 +68,7 @@
public Conduit getConduit(EndpointInfo endpointInfo, EndpointReferenceType
target) throws IOException {
JMSOldConfigHolder old = new JMSOldConfigHolder();
JMSConfiguration jmsConf =
old.createJMSConfigurationFromEndpointInfo(bus, endpointInfo, true);
- return new JMSConduit(endpointInfo, target, jmsConf);
+ return new JMSConduit(endpointInfo, target, jmsConf, bus);
}
/**
Modified:
cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java?rev=892469&r1=892468&r2=892469&view=diff
==============================================================================
---
cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
(original)
+++
cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
Sat Dec 19 13:06:05 2009
@@ -117,7 +117,7 @@
JMSConfiguration jmsConfig = new JMSOldConfigHolder()
.createJMSConfigurationFromEndpointInfo(bus, endpointInfo, true);
- JMSConduit jmsConduit = new JMSConduit(endpointInfo, target,
jmsConfig);
+ JMSConduit jmsConduit = new JMSConduit(endpointInfo, target,
jmsConfig, bus);
if (send) {
// setMessageObserver
observer = new MessageObserver() {
Modified:
cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/JMSClientServerTest.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/JMSClientServerTest.java?rev=892469&r1=892468&r2=892469&view=diff
==============================================================================
---
cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/JMSClientServerTest.java
(original)
+++
cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/JMSClientServerTest.java
Sat Dec 19 13:06:05 2009
@@ -25,14 +25,18 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import javax.activation.DataHandler;
import javax.jms.DeliveryMode;
import javax.xml.namespace.QName;
+import javax.xml.ws.AsyncHandler;
import javax.xml.ws.Binding;
import javax.xml.ws.BindingProvider;
import javax.xml.ws.Endpoint;
import javax.xml.ws.Holder;
+import javax.xml.ws.Response;
import javax.xml.ws.soap.SOAPBinding;
import javax.xml.ws.soap.SOAPFaultException;
@@ -153,8 +157,7 @@
fail("Should have thrown FaultException");
} catch (PingMeFault ex) {
assertNotNull(ex.getFaultInfo());
- }
-
+ }
}
} catch (UndeclaredThrowableException ex) {
throw (Exception)ex.getCause();
@@ -205,6 +208,117 @@
}
@Test
+ public void testAsyncCall() throws Exception {
+ QName serviceName = getServiceName(new
QName("http://cxf.apache.org/hello_world_jms",
+ "HelloWorldService"));
+ QName portName = getPortName(new
QName("http://cxf.apache.org/hello_world_jms", "HelloWorldPort"));
+ URL wsdl = getWSDLURL("/wsdl/jms_test.wsdl");
+ assertNotNull(wsdl);
+
+ HelloWorldService service = new HelloWorldService(wsdl, serviceName);
+ assertNotNull(service);
+ HelloWorldPortType greeter = service.getPort(portName,
HelloWorldPortType.class);
+ final Thread thread = Thread.currentThread();
+
+ class TestAsyncHandler implements AsyncHandler<String> {
+ String expected;
+
+ public TestAsyncHandler(String x) {
+ expected = x;
+ }
+
+ public String getExpected() {
+ return expected;
+ }
+ public void handleResponse(Response<String> response) {
+ try {
+ Thread thread2 = Thread.currentThread();
+ assertNotSame(thread, thread2);
+ assertEquals("Hello " + expected, response.get());
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ TestAsyncHandler h1 = new TestAsyncHandler("Homer");
+ TestAsyncHandler h2 = new TestAsyncHandler("Maggie");
+ TestAsyncHandler h3 = new TestAsyncHandler("Bart");
+ TestAsyncHandler h4 = new TestAsyncHandler("Lisa");
+ TestAsyncHandler h5 = new TestAsyncHandler("Marge");
+
+ Future<?> f1 = greeter.greetMeAsync("Santa's Little Helper",
+ new TestAsyncHandler("Santa's
Little Helper"));
+ f1.get();
+ f1 = greeter.greetMeAsync("PauseForTwoSecs Santa's Little Helper",
+ new TestAsyncHandler("Santa's Little
Helper"));
+ long start = System.currentTimeMillis();
+ f1 = greeter.greetMeAsync("PauseForTwoSecs " + h1.getExpected(), h1);
+ Future<?> f2 = greeter.greetMeAsync("PauseForTwoSecs " +
h2.getExpected(), h2);
+ Future<?> f3 = greeter.greetMeAsync("PauseForTwoSecs " +
h3.getExpected(), h3);
+ Future<?> f4 = greeter.greetMeAsync("PauseForTwoSecs " +
h4.getExpected(), h4);
+ Future<?> f5 = greeter.greetMeAsync("PauseForTwoSecs " +
h5.getExpected(), h5);
+ long mid = System.currentTimeMillis();
+ assertEquals("Hello " + h1.getExpected(), f1.get());
+ assertEquals("Hello " + h2.getExpected(), f2.get());
+ assertEquals("Hello " + h3.getExpected(), f3.get());
+ assertEquals("Hello " + h4.getExpected(), f4.get());
+ assertEquals("Hello " + h5.getExpected(), f5.get());
+ long end = System.currentTimeMillis();
+
+ assertTrue("Time too long: " + (mid - start), (mid - start) < 1000);
+ assertTrue((end - mid) > 1000);
+ f1 = null;
+ f2 = null;
+ f3 = null;
+ f4 = null;
+ f5 = null;
+
+ /*
+ int count = 20;
+ TestAsyncHandler handlers[] = new TestAsyncHandler[count];
+ Future<?> futures[] = new Future<?>[count];
+ for (int x = 0; x < count; x++) {
+ handlers[x] = new TestAsyncHandler("Handler" + x);
+ futures[x] = greeter.greetMeAsync("PauseForTwoSecs " +
handlers[x].getExpected(),
+ handlers[x]);
+ //intersperse some sync calls in there....
+ if (x == 2 || x == 5) {
+ assertEquals("Hello World", greeter.greetMe("World"));
+ }
+ if (x == 10) {
+ assertEquals("Hello World", greeter.greetMe("PauseForTwoSecs
World"));
+ }
+ }
+ int countDone = 0;
+ for (int x = 0; x < count; x++) {
+ if (futures[x].isDone()) {
+ countDone++;
+ }
+ }
+ assertTrue("Should not all be done.", countDone < count);
+ for (int x = 0; x < count; x++) {
+ assertEquals("Hello " + handlers[x].getExpected(),
futures[x].get());
+ }
+ countDone = 0;
+ for (int x = 0; x < count; x++) {
+ if (futures[x].isDone()) {
+ countDone++;
+ }
+ }
+ assertEquals(count, countDone);
+ */
+
+ greeter = null;
+ service = null;
+
+ System.gc();
+ System.gc();
+ System.gc();
+ }
+
+ @Test
public void testBasicConnection() throws Exception {
QName serviceName = getServiceName(new
QName("http://cxf.apache.org/hello_world_jms",
"HelloWorldService"));
@@ -243,11 +357,10 @@
assertNotNull(nslf.getFaultInfo());
assertNotNull(nslf.getFaultInfo().getCode());
}
+
}
} catch (UndeclaredThrowableException ex) {
throw (Exception)ex.getCause();
- } catch (Exception t) {
- throw t;
}
}
Modified:
cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/TwoWayJMSImplBase.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/TwoWayJMSImplBase.java?rev=892469&r1=892468&r2=892469&view=diff
==============================================================================
---
cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/TwoWayJMSImplBase.java
(original)
+++
cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/TwoWayJMSImplBase.java
Sat Dec 19 13:06:05 2009
@@ -18,7 +18,11 @@
*/
package org.apache.cxf.systest.jms;
+import java.util.concurrent.Future;
+
import javax.annotation.Resource;
+import javax.xml.ws.AsyncHandler;
+import javax.xml.ws.Response;
import javax.xml.ws.WebServiceContext;
import javax.xml.ws.handler.MessageContext;
@@ -38,6 +42,15 @@
@Resource
protected WebServiceContext wsContext;
public String greetMe(String me) {
+ if (me.startsWith("PauseForTwoSecs")) {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ //ignore
+ }
+ me = me.substring("PauseForTwoSecs".length()).trim();
+ }
+
MessageContext mc = wsContext.getMessageContext();
JMSMessageHeadersType headers =
(JMSMessageHeadersType)
mc.get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
@@ -86,4 +99,34 @@
return new TestRpcLitFaultResponse();
}
+ public Response<String> greetMeAsync(String stringParam0) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public Future<?> greetMeAsync(String stringParam0, AsyncHandler<String>
asyncHandler) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public Response<String> sayHiAsync() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public Future<?> sayHiAsync(AsyncHandler<String> asyncHandler) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public Response<TestRpcLitFaultResponse> testRpcLitFaultAsync(String in) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public Future<?> testRpcLitFaultAsync(String in,
AsyncHandler<TestRpcLitFaultResponse> asyncHandler) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
Added: cxf/trunk/testutils/src/main/resources/wsdl/jms_test-binding.xml
URL:
http://svn.apache.org/viewvc/cxf/trunk/testutils/src/main/resources/wsdl/jms_test-binding.xml?rev=892469&view=auto
==============================================================================
--- cxf/trunk/testutils/src/main/resources/wsdl/jms_test-binding.xml (added)
+++ cxf/trunk/testutils/src/main/resources/wsdl/jms_test-binding.xml Sat Dec 19
13:06:05 2009
@@ -0,0 +1,27 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<bindings
+ xmlns:xsd="http://www.w3.org/2001/XMLSchema"
+ xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/"
+ wsdlLocation="jms_test.wsdl"
+ xmlns="http://java.sun.com/xml/ns/jaxws">
+ <bindings node="wsdl:definitions">
+ <enableAsyncMapping>true</enableAsyncMapping>
+ </bindings>
+</bindings>
\ No newline at end of file
Propchange: cxf/trunk/testutils/src/main/resources/wsdl/jms_test-binding.xml
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/testutils/src/main/resources/wsdl/jms_test-binding.xml
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange: cxf/trunk/testutils/src/main/resources/wsdl/jms_test-binding.xml
------------------------------------------------------------------------------
svn:mime-type = text/xml
Modified: cxf/trunk/testutils/src/main/resources/wsdl/jms_test.wsdl
URL:
http://svn.apache.org/viewvc/cxf/trunk/testutils/src/main/resources/wsdl/jms_test.wsdl?rev=892469&r1=892468&r2=892469&view=diff
==============================================================================
--- cxf/trunk/testutils/src/main/resources/wsdl/jms_test.wsdl (original)
+++ cxf/trunk/testutils/src/main/resources/wsdl/jms_test.wsdl Sat Dec 19
13:06:05 2009
@@ -347,8 +347,8 @@
<jms:JMSNamingProperty name="java.naming.factory.initial"
value="org.apache.activemq.jndi.ActiveMQInitialContextFactory"/>
<jms:JMSNamingProperty name="java.naming.provider.url"
value="tcp://localhost:61500"/>
</jms:address>
-
<jms:server durableSubscriberName="CXF_subscriber"/>
+ <jms:sessionPool hightWaterMark="5" lowWaterMark="5"/>
</port>
</service>