Author: hadrian
Date: Fri Dec 14 06:17:54 2007
New Revision: 604202
URL: http://svn.apache.org/viewvc?rev=604202&view=rev
Log:
Patch for CAMEL-261. Applied with thanks!
Modified:
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfBinding.java
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfExchange.java
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapBinding.java
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapComponent.java
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapProducer.java
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/invoker/CxfClient.java
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/util/WSDLSoapServiceFactoryBean.java
Modified:
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfBinding.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfBinding.java?rev=604202&r1=604201&r2=604202&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfBinding.java
(original)
+++
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfBinding.java
Fri Dec 14 06:17:54 2007
@@ -16,13 +16,22 @@
*/
package org.apache.camel.component.cxf;
+import org.apache.camel.CamelException;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.cxf.helpers.XMLUtils;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Set;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+
/**
* The binding of how Camel messages get mapped to Apache CXF and back again
*
@@ -68,6 +77,15 @@
answer.setContent(List.class, body);
//just set the method name
answer.setContent(String.class,
in.getHeader(CxfConstants.OPERATION_NAME));
+ } else if (body instanceof DOMSource) {
+ DOMSource source = (DOMSource) body;
+ try {
+ ByteArrayInputStream bais = new
ByteArrayInputStream(XMLUtils.toString(source).getBytes());
+ answer.setContent(InputStream.class, bais);
+ } catch (Exception e) {
+ throw new RuntimeCamelException(e);
+ }
+
}
Modified:
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfExchange.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfExchange.java?rev=604202&r1=604201&r2=604202&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfExchange.java
(original)
+++
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfExchange.java
Fri Dec 14 06:17:54 2007
@@ -89,7 +89,9 @@
}
-
+ public void setExchange(Exchange exchange) {
+ this.exchange = exchange;
+ }
// Expose CXF APIs directly on the exchange
//-------------------------------------------------------------------------
Modified:
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?rev=604202&r1=604201&r2=604202&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
(original)
+++
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
Fri Dec 14 06:17:54 2007
@@ -25,6 +25,7 @@
import org.apache.camel.component.cxf.invoker.CxfClient;
import org.apache.camel.component.cxf.invoker.CxfClientFactoryBean;
import org.apache.camel.component.cxf.invoker.InvokingContext;
+import org.apache.camel.component.cxf.invoker.InvokingContextFactory;
import org.apache.camel.component.cxf.spring.CxfEndpointBean;
import org.apache.camel.component.cxf.util.CxfEndpointUtils;
import org.apache.camel.impl.DefaultProducer;
@@ -38,6 +39,7 @@
import org.apache.cxf.endpoint.Client;
import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.frontend.ClientFactoryBean;
+import org.apache.cxf.message.ExchangeImpl;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.service.model.BindingOperationInfo;
@@ -177,7 +179,20 @@
} else {
// get the invocation context
org.apache.cxf.message.Exchange ex = exchange.getExchange();
+ if (ex == null) {
+ ex = (org.apache.cxf.message.Exchange)
exchange.getProperty(CxfConstants.CXF_EXCHANGE);
+ exchange.setExchange(ex);
+ }
+ if (ex == null) {
+ ex = new ExchangeImpl();
+ exchange.setExchange(ex);
+ }
+ assert ex != null;
InvokingContext invokingContext =
ex.get(InvokingContext.class);
+ if (invokingContext == null) {
+ invokingContext =
InvokingContextFactory.createContext(dataFormat);
+ ex.put(InvokingContext.class, invokingContext);
+ }
Object params = invokingContext.getRequestContent(inMessage);
// invoke the stream message with the exchange context
CxfClient cxfClient = (CxfClient) client;
Modified:
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapBinding.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapBinding.java?rev=604202&r1=604201&r2=604202&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapBinding.java
(original)
+++
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapBinding.java
Fri Dec 14 06:17:54 2007
@@ -16,8 +16,11 @@
*/
package org.apache.camel.component.cxf;
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.CharBuffer;
import java.util.List;
import java.util.Map;
@@ -54,6 +57,23 @@
Object body = message.getBody(InputStream.class);
if (body == null) {
body = message.getBody();
+ }
+ if (body instanceof BufferedReader) {
+ //do transform from BufferedReader to InputStream
+
+ try {
+ BufferedReader reader = (BufferedReader)body;
+ String line;
+ String content = "";
+ while ((line = reader.readLine()) != null) {
+ content = content + line;
+ }
+ ByteArrayInputStream bais = new
ByteArrayInputStream(content.getBytes());
+ answer.setContent(InputStream.class, bais);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
}
if (body instanceof InputStream) {
answer.setContent(InputStream.class, body);
Modified:
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapComponent.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapComponent.java?rev=604202&r1=604201&r2=604202&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapComponent.java
(original)
+++
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapComponent.java
Fri Dec 14 06:17:54 2007
@@ -44,5 +44,10 @@
soapEndpoint.init();
return soapEndpoint;
}
+
+ @Override
+ protected boolean useIntrospectionOnEndpoint() {
+ return false;
+ }
}
Modified:
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapProducer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapProducer.java?rev=604202&r1=604201&r2=604202&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapProducer.java
(original)
+++
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapProducer.java
Fri Dec 14 06:17:54 2007
@@ -19,6 +19,7 @@
import java.io.OutputStream;
import javax.xml.transform.Source;
+import javax.xml.transform.dom.DOMSource;
import org.apache.camel.*;
import org.apache.camel.component.cxf.util.CxfEndpointUtils;
@@ -33,6 +34,7 @@
import org.apache.cxf.endpoint.ClientImpl;
import org.apache.cxf.frontend.ClientFactoryBean;
import org.apache.cxf.frontend.ServerFactoryBean;
+import org.apache.cxf.helpers.XMLUtils;
import org.apache.cxf.interceptor.InterceptorChain;
import org.apache.cxf.interceptor.OutgoingChainInterceptor;
import org.apache.cxf.io.CachedOutputStream;
@@ -154,8 +156,8 @@
chain.doIntercept(outMessage);
CachedOutputStream outputStream =
(CachedOutputStream)outMessage.getContent(OutputStream.class);
-
exchange.getOut().setBody(outputStream.getInputStream());
+ exchange.getIn().setBody(outputStream.getInputStream());
}
}
Modified:
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/invoker/CxfClient.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/invoker/CxfClient.java?rev=604202&r1=604201&r2=604202&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/invoker/CxfClient.java
(original)
+++
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/invoker/CxfClient.java
Fri Dec 14 06:17:54 2007
@@ -18,6 +18,7 @@
import java.io.IOException;
import java.io.InputStream;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
@@ -25,12 +26,16 @@
import org.apache.cxf.Bus;
import org.apache.cxf.binding.Binding;
+import org.apache.cxf.binding.soap.interceptor.ReadHeadersInterceptor;
+import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.endpoint.ClientImpl;
import org.apache.cxf.endpoint.ConduitSelector;
import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.endpoint.EndpointImpl;
import org.apache.cxf.endpoint.PreexistingConduitSelector;
import org.apache.cxf.helpers.IOUtils;
+import org.apache.cxf.interceptor.Interceptor;
+import org.apache.cxf.interceptor.InterceptorChain;
import org.apache.cxf.io.CachedOutputStream;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.ExchangeImpl;
@@ -39,6 +44,7 @@
import org.apache.cxf.service.model.BindingInfo;
import org.apache.cxf.service.model.BindingMessageInfo;
import org.apache.cxf.service.model.BindingOperationInfo;
+import org.apache.cxf.service.model.OperationInfo;
import org.apache.cxf.transport.MessageObserver;
/**
@@ -125,7 +131,9 @@
// setup conduit selector
prepareConduitSelector(message);
+ modifyChain(chain, null);
// execute chain
+
chain.doIntercept(message);
//it will close all the stream in the message, so we do not call it
@@ -142,8 +150,12 @@
}
if (!exchange.isOneWay()) {
+
+ synchronized (exchange) {
+ waitResponse(exchange);
+ }
ex = getException(exchange);
-
+
if (ex != null) {
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("Exception in incoming chain: " + ex.toString());
@@ -156,42 +168,78 @@
return retval;
}
+
+ private void waitResponse(Exchange exchange) {
+ int remaining = synchronousTimeout;
+ while (!Boolean.TRUE.equals(exchange.get(FINISHED)) && remaining > 0) {
+ long start = System.currentTimeMillis();
+ try {
+ exchange.wait(remaining);
+ } catch (InterruptedException ex) {
+ // ignore
+ }
+ long end = System.currentTimeMillis();
+ remaining -= (int)(end - start);
+ }
+ if (!Boolean.TRUE.equals(exchange.get(FINISHED))) {
+ LogUtils.log(LOG, Level.WARNING, "RESPONSE_TIMEOUT",
+ exchange.get(OperationInfo.class).getName().toString());
+ }
+ }
public void onMessage(Message message) {
Exchange exchange = message.getExchange();
+
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("call the cxf client on message , exchange is " +
exchange);
}
- if (exchange.get(InvokingContext.class) == null) {
- super.onMessage(message);
- } else {
-
- message = getEndpoint().getBinding().createMessage(message);
- message.put(Message.REQUESTOR_ROLE, Boolean.TRUE);
- message.put(Message.INBOUND_MESSAGE, Boolean.TRUE);
-
- exchange.put(Binding.class, getEndpoint().getBinding());
- BindingOperationInfo bi =
exchange.get(BindingOperationInfo.class);
- if (bi != null) {
- //Set The OutputMessage
- exchange.put(BindingMessageInfo.class, bi.getOutput());
- }
- InvokingContext invokingContext =
exchange.get(InvokingContext.class);
- assert invokingContext != null;
-
- // setup interceptor chain
- PhaseInterceptorChain chain =
invokingContext.getResponseInInterceptorChain(exchange);
- message.setInterceptorChain(chain);
-
- // execute chain
- chain.doIntercept(message);
+
+ try {
+
+ if (exchange.get(InvokingContext.class) == null) {
+ super.onMessage(message);
+ } else {
+
+ message = getEndpoint().getBinding().createMessage(message);
+ message.put(Message.REQUESTOR_ROLE, Boolean.TRUE);
+ message.put(Message.INBOUND_MESSAGE, Boolean.TRUE);
+
+ exchange.put(Binding.class, getEndpoint().getBinding());
+ BindingOperationInfo bi =
exchange.get(BindingOperationInfo.class);
+ if (bi != null) {
+ //Set The OutputMessage
+ exchange.put(BindingMessageInfo.class, bi.getOutput());
+ }
+ InvokingContext invokingContext =
exchange.get(InvokingContext.class);
+ assert invokingContext != null;
+
+ // setup interceptor chain
+ PhaseInterceptorChain chain =
invokingContext.getResponseInInterceptorChain(exchange);
+ message.setInterceptorChain(chain);
+ modifyChain(chain, null);
+ // execute chain
+ chain.doIntercept(message);
- // set inMessage in the exchange
- exchange.setInMessage(message);
+ // set inMessage in the exchange
+ exchange.setInMessage(message);
+ }
+
+ } finally {
+ synchronized (message.getExchange()) {
+ if (!isPartialResponse(message)) {
+ message.getExchange().put(FINISHED,
Boolean.TRUE);
+ message.getExchange().setInMessage(message);
+ message.getExchange().notifyAll();
+ }
+ }
}
}
+ private boolean isPartialResponse(Message in) {
+ return Boolean.TRUE.equals(in.get(Message.PARTIAL_RESPONSE_MESSAGE));
+ }
+
private Message prepareMessage(Exchange exchange, Map<String, Object>
requestContext,
Object param, InvokingContext InvokingContext) {
@@ -229,6 +277,8 @@
chain.setFaultObserver(outFaultObserver);
return chain;
}
+
+
public Endpoint getEndpoint() {
return endpoint;
Modified:
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/util/WSDLSoapServiceFactoryBean.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/util/WSDLSoapServiceFactoryBean.java?rev=604202&r1=604201&r2=604202&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/util/WSDLSoapServiceFactoryBean.java
(original)
+++
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/util/WSDLSoapServiceFactoryBean.java
Fri Dec 14 06:17:54 2007
@@ -46,7 +46,8 @@
@Override
public Service create() {
- WSDLServiceFactory factory = new WSDLServiceFactory(getBus(),
getWsdlURL(), null);
+ WSDLServiceFactory factory = new WSDLServiceFactory(getBus(),
getWsdlURL(), getServiceQName());
+
setService(factory.create());
initializeSoapInterceptors();
//disable the date interceptors