Author: jstrachan
Date: Tue Mar 27 06:41:09 2007
New Revision: 522906
URL: http://svn.apache.org/viewvc?view=rev&rev=522906
Log:
a rough initial spike attempting to port the JMS transport for CXF to Camel
Added:
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java
(with props)
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConstants.java
(with props)
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelDestination.java
(with props)
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransport.java
(with props)
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportBase.java
(with props)
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportFactory.java
(with props)
Added:
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java?view=auto&rev=522906
==============================================================================
---
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java
(added)
+++
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java
Tue Mar 27 06:41:09 2007
@@ -0,0 +1,283 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf.transport;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Processor;
+import org.apache.cxf.Bus;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.configuration.Configurable;
+import org.apache.cxf.configuration.Configurer;
+import org.apache.cxf.io.AbstractCachedOutputStream;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.AbstractConduit;
+import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.transport.Destination;
+import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * @version $Revision$
+ */
+public class CamelConduit extends AbstractConduit implements Configurable {
+ protected static final String BASE_BEAN_NAME_SUFFIX = ".jms-conduit-base";
+ private static final Logger LOG =
LogUtils.getL7dLogger(CamelConduit.class);
+ protected final CamelTransportBase base;
+ private String targetCamelEndpointUri;
+/*
+ protected ClientConfig clientConfig;
+ protected ClientBehaviorPolicyType runtimePolicy;
+ protected AddressType address;
+ protected SessionPoolType sessionPool;
+*/
+
+ public CamelConduit(CamelContext camelContext, Bus bus, EndpointInfo
endpointInfo, EndpointReferenceType targetReference) {
+ super(targetReference);
+
+ base = new CamelTransportBase(camelContext, bus, endpointInfo, false,
BASE_BEAN_NAME_SUFFIX);
+
+ initConfig();
+ }
+
+ // prepare the message for send out , not actually send out the message
+ public void send(Message message) throws IOException {
+ getLogger().log(Level.FINE, "JMSConduit send message");
+
+ message.setContent(OutputStream.class,
+ new JMSOutputStream(message));
+ }
+
+ public void close() {
+ getLogger().log(Level.FINE, "JMSConduit closed ");
+
+ // ensure resources held by session factory are released
+ //
+ base.close();
+ }
+
+ protected Logger getLogger() {
+ return LOG;
+ }
+
+ public String getBeanName() {
+ return base.endpointInfo.getName().toString() + ".jms-conduit";
+ }
+
+ private void initConfig() {
+
+/*
+ this.address = base.endpointInfo.getTraversedExtensor(new
AddressType(),
+
AddressType.class);
+ this.sessionPool = base.endpointInfo.getTraversedExtensor(new
SessionPoolType(),
+
SessionPoolType.class);
+ this.clientConfig = base.endpointInfo.getTraversedExtensor(new
ClientConfig(),
+
ClientConfig.class);
+ this.runtimePolicy = base.endpointInfo.getTraversedExtensor(new
ClientBehaviorPolicyType(),
+
ClientBehaviorPolicyType.class);
+*/
+
+ Configurer configurer = base.bus.getExtension(Configurer.class);
+ if (null != configurer) {
+ configurer.configureBean(this);
+ }
+ }
+
+ private class JMSOutputStream extends AbstractCachedOutputStream {
+ private Message outMessage;
+ private boolean isOneWay;
+
+ public JMSOutputStream(Message m) {
+ outMessage = m;
+ }
+
+ protected void doFlush() throws IOException {
+ //do nothing here
+ }
+
+ protected void doClose() throws IOException {
+ isOneWay = outMessage.getExchange().isOneWay();
+ commitOutputMessage();
+ if (!isOneWay) {
+ handleResponse();
+ }
+ }
+
+ protected void onWrite() throws IOException {
+
+ }
+
+ private void commitOutputMessage() {
+ base.client.send(targetCamelEndpointUri, new
Processor<org.apache.camel.Exchange>() {
+ public void onExchange(org.apache.camel.Exchange reply) {
+ Object request = null;
+
+ if (isTextPayload()) {
+ request = currentStream.toString();
+ }
+ else {
+ request = ((ByteArrayOutputStream)
currentStream).toByteArray();
+ }
+
+ getLogger().log(Level.FINE, "Conduit Request is :[" +
request + "]");
+ String replyTo = base.getReplyDestination();
+
+ //TODO setting up the responseExpected
+
+ base.marshal(request, replyTo, reply);
+
+ base.setMessageProperties(outMessage, reply);
+
+ String correlationID = null;
+ if (!isOneWay) {
+ // TODO create a correlationID
+ String id = null;
+
+ if (id != null) {
+ if (correlationID != null) {
+ String error = "User cannot set
JMSCorrelationID when "
+ + "making a request/reply invocation
using "
+ + "a static replyTo Queue.";
+ }
+ correlationID = id;
+ }
+ }
+
+ if (correlationID != null) {
+
reply.getIn().setHeader(CamelConstants.CAMEL_CORRELATION_ID, correlationID);
+ }
+ else {
+ //No message correlation id is set. Whatever comeback
will be accepted as responses.
+ // We assume that it will only happen in case of the
temp. reply queue.
+ }
+
+ getLogger().log(Level.FINE, "client sending request: ",
reply.getIn());
+ }
+ });
+ }
+
+ private void handleResponse() throws IOException {
+ // REVISIT distinguish decoupled case or oneway call
+ Object response = null;
+
+ //TODO if outMessage need to get the response
+ Message inMessage = new MessageImpl();
+ outMessage.getExchange().setInMessage(inMessage);
+ //set the message header back to the incomeMessage
+ //inMessage.put(JMSConstants.CAMEL_CLIENT_RESPONSE_HEADERS,
+ //
outMessage.get(JMSConstants.CAMEL_CLIENT_RESPONSE_HEADERS));
+
+ /*
+ Object result1;
+
+ Object result = null;
+
+ javax.jms.Message jmsMessage1 =
pooledSession.consumer().receive(timeout);
+ getLogger().log(Level.FINE, "client received reply: " ,
jmsMessage1);
+
+ if (jmsMessage1 != null) {
+
+ base.populateIncomingContext(jmsMessage1, outMessage,
JMSConstants.CAMEL_CLIENT_RESPONSE_HEADERS);
+ String messageType = jmsMessage1 instanceof TextMessage
+ ? JMSConstants.TEXT_MESSAGE_TYPE :
JMSConstants.BINARY_MESSAGE_TYPE;
+ result = base.unmarshal((org.apache.camel.Exchange)
outMessage);
+ result1 = result;
+ } else {
+ String error = "JMSClientTransport.receive() timed out. No
message available.";
+ getLogger().log(Level.SEVERE, error);
+ //TODO: Review what exception should we throw.
+ throw new JMSException(error);
+
+ }
+ response = result1;
+
+ //set the message header back to the incomeMessage
+ inMessage.put(JMSConstants.CAMEL_CLIENT_RESPONSE_HEADERS,
+
outMessage.get(JMSConstants.CAMEL_CLIENT_RESPONSE_HEADERS));
+
+ */
+
+ getLogger().log(Level.FINE, "The Response Message is : [" +
response + "]");
+
+ // setup the inMessage response stream
+ byte[] bytes = null;
+ if (response instanceof String) {
+ String requestString = (String) response;
+ bytes = requestString.getBytes();
+ }
+ else {
+ bytes = (byte[]) response;
+ }
+ inMessage.setContent(InputStream.class, new
ByteArrayInputStream(bytes));
+ getLogger().log(Level.FINE, "incoming observer is " +
incomingObserver);
+ incomingObserver.onMessage(inMessage);
+ }
+ }
+
+ private boolean isTextPayload() {
+ // TODO use runtime policy
+ return true;
+ }
+
+ /**
+ * Represented decoupled response endpoint.
+ */
+ protected class DecoupledDestination implements Destination {
+ protected MessageObserver decoupledMessageObserver;
+ private EndpointReferenceType address;
+
+ DecoupledDestination(EndpointReferenceType ref,
+ MessageObserver incomingObserver) {
+ address = ref;
+ decoupledMessageObserver = incomingObserver;
+ }
+
+ public EndpointReferenceType getAddress() {
+ return address;
+ }
+
+ public Conduit getBackChannel(Message inMessage,
+ Message partialResponse,
+ EndpointReferenceType addr)
+ throws IOException {
+ // shouldn't be called on decoupled endpoint
+ return null;
+ }
+
+ public void shutdown() {
+ // TODO Auto-generated method stub
+ }
+
+ public synchronized void setMessageObserver(MessageObserver observer) {
+ decoupledMessageObserver = observer;
+ }
+
+ protected synchronized MessageObserver getMessageObserver() {
+ return decoupledMessageObserver;
+ }
+ }
+}
Propchange:
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange:
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConstants.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConstants.java?view=auto&rev=522906
==============================================================================
---
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConstants.java
(added)
+++
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConstants.java
Tue Mar 27 06:41:09 2007
@@ -0,0 +1,55 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf.transport;
+
+/**
+ * @version $Revision$
+ */
+public class CamelConstants {
+
+ public static final String TEXT_MESSAGE_TYPE = "text";
+ public static final String BINARY_MESSAGE_TYPE = "binary";
+
+ public static final String CAMEL_SERVER_REQUEST_HEADERS =
"org.apache.cxf.camel.server.request.headers";
+ public static final String CAMEL_SERVER_RESPONSE_HEADERS =
"org.apache.cxf.camel.server.response.headers";
+ public static final String CAMEL_REQUEST_MESSAGE =
"org.apache.cxf.camel.request.message";
+ public static final String CAMEL_RESPONSE_MESSAGE =
"org.apache.cxf.camel.reponse.message";
+ public static final String CAMEL_CLIENT_REQUEST_HEADERS =
"org.apache.cxf.camel.client.request.headers";
+ public static final String CAMEL_CLIENT_RESPONSE_HEADERS =
+ "org.apache.cxf.camel.client.response.headers";
+
+ public static final String CAMEL_CLIENT_RECEIVE_TIMEOUT =
"org.apache.cxf.camel.client.timeout";
+
+ public static final String CAMEL_SERVER_CONFIGURATION_URI =
+ "http://cxf.apache.org/configuration/transport/camel-server";
+ public static final String CAMEL_CLIENT_CONFIGURATION_URI =
+ "http://cxf.apache.org/configuration/transport/camel-client";
+ public static final String ENDPOINT_CONFIGURATION_URI =
+ "http://cxf.apache.org/jaxws/endpoint-config";
+ public static final String SERVICE_CONFIGURATION_URI =
+ "http://cxf.apache.org/jaxws/service-config";
+ public static final String PORT_CONFIGURATION_URI =
+ "http://cxf.apache.org/jaxws/port-config";
+
+ public static final String CAMEL_CLIENT_CONFIG_ID = "camel-client";
+ public static final String CAMEL_SERVER_CONFIG_ID = "camel-server";
+
+ public static final String CAMEL_REBASED_REPLY_TO =
"org.apache.cxf.camel.server.replyto";
+
+ public static final String CAMEL_CORRELATION_ID =
"org.apache.cxf.camel.correlationId";
+}
Propchange:
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConstants.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConstants.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange:
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConstants.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelDestination.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelDestination.java?view=auto&rev=522906
==============================================================================
---
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelDestination.java
(added)
+++
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelDestination.java
Tue Mar 27 06:41:09 2007
@@ -0,0 +1,247 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf.transport;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.Endpoint;
+import org.apache.cxf.Bus;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.configuration.Configurable;
+import org.apache.cxf.io.AbstractCachedOutputStream;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.AbstractConduit;
+import org.apache.cxf.transport.AbstractDestination;
+import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.transport.ConduitInitiator;
+import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.cxf.wsdl.EndpointReferenceUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * @version $Revision$
+ */
+public class CamelDestination extends AbstractDestination implements
Configurable {
+ protected static final String BASE_BEAN_NAME_SUFFIX =
".jms-destination-base";
+ private static final Logger LOG =
LogUtils.getL7dLogger(CamelDestination.class);
+ CamelContext camelContext;
+ String camelUri;
+ final ConduitInitiator conduitInitiator;
+ private CamelTransportBase base;
+ private Endpoint endpoint;
+
+ public CamelDestination(CamelContext camelContext, Bus bus,
+ ConduitInitiator ci,
+ EndpointInfo info) throws IOException {
+ super(getTargetReference(info, bus), info);
+ this.camelContext = camelContext;
+
+ base = new CamelTransportBase(camelContext, bus, endpointInfo, true,
BASE_BEAN_NAME_SUFFIX);
+
+ conduitInitiator = ci;
+
+ initConfig();
+ }
+
+ protected Logger getLogger() {
+ return LOG;
+ }
+
+ /**
+ * @param inMessage the incoming message
+ * @return the inbuilt backchannel
+ */
+ protected Conduit getInbuiltBackChannel(Message inMessage) {
+ return new
BackChannelConduit(EndpointReferenceUtils.getAnonymousEndpointReference(),
+ inMessage);
+ }
+
+ public void activate() {
+ getLogger().log(Level.INFO, "CamelDestination activate().... ");
+
+ try {
+ getLogger().log(Level.FINE, "establishing Camel connection");
+ endpoint = camelContext.resolveEndpoint(camelUri);
+ }
+ catch (Exception ex) {
+ getLogger().log(Level.SEVERE, "Camel connect failed with
EException : ", ex);
+ }
+ }
+
+ public void deactivate() {
+ base.close();
+ }
+
+ public void shutdown() {
+ getLogger().log(Level.FINE, "CamelDestination shutdown()");
+ this.deactivate();
+ }
+
+ protected void incoming(Exchange exchange) {
+ getLogger().log(Level.FINE, "server received request: ", exchange);
+
+ byte[] bytes = base.unmarshal(exchange);
+
+ // get the message to be interceptor
+ MessageImpl inMessage = new MessageImpl();
+ inMessage.setContent(InputStream.class, new
ByteArrayInputStream(bytes));
+ base.populateIncomingContext(exchange, inMessage,
CamelConstants.CAMEL_SERVER_REQUEST_HEADERS);
+ //inMessage.put(JMSConstants.CAMEL_SERVER_RESPONSE_HEADERS, new
JMSMessageHeadersType());
+ inMessage.put(CamelConstants.CAMEL_REQUEST_MESSAGE, exchange);
+
+ inMessage.setDestination(this);
+
+ //handle the incoming message
+ incomingObserver.onMessage(inMessage);
+ }
+
+ public String getBeanName() {
+ return endpointInfo.getName().toString() + ".jms-destination";
+ }
+
+ private void initConfig() {
+/*
+ this.runtimePolicy = endpointInfo.getTraversedExtensor(new
ServerBehaviorPolicyType(),
+
ServerBehaviorPolicyType.class);
+ this.serverConfig = endpointInfo.getTraversedExtensor(new
ServerConfig(), ServerConfig.class);
+ this.address = endpointInfo.getTraversedExtensor(new AddressType(),
AddressType.class);
+ this.sessionPool = endpointInfo.getTraversedExtensor(new
SessionPoolType(), SessionPoolType.class);
+*/
+ }
+
+ protected class ConsumerProcessor implements Processor<Exchange> {
+ public void onExchange(Exchange exchange) {
+ try {
+ incoming(exchange);
+ }
+ catch (Throwable ex) {
+ getLogger().log(Level.WARNING, "Failed to process incoming
message : ", ex);
+ }
+ }
+ }
+
+ // this should deal with the cxf message
+ protected class BackChannelConduit extends AbstractConduit {
+ protected Message inMessage;
+
+ BackChannelConduit(EndpointReferenceType ref, Message message) {
+ super(ref);
+ inMessage = message;
+ }
+
+ /**
+ * Register a message observer for incoming messages.
+ *
+ * @param observer the observer to notify on receipt of incoming
+ */
+ public void setMessageObserver(MessageObserver observer) {
+ // shouldn't be called for a back channel conduit
+ }
+
+ /**
+ * Send an outbound message, assumed to contain all the name-value
+ * mappings of the corresponding input message (if any).
+ *
+ * @param message the message to be sent.
+ */
+ public void send(Message message) throws IOException {
+ // setup the message to be send back
+ message.put(CamelConstants.CAMEL_REQUEST_MESSAGE,
+ inMessage.get(CamelConstants.CAMEL_REQUEST_MESSAGE));
+ message.setContent(OutputStream.class,
+ new CamelOutputStream(inMessage));
+ }
+
+ protected Logger getLogger() {
+ return LOG;
+ }
+ }
+
+ private class CamelOutputStream extends AbstractCachedOutputStream {
+ private Message inMessage;
+ private Producer<Exchange> replyTo;
+ private Producer<Exchange> sender;
+
+ // setup the ByteArrayStream
+ public CamelOutputStream(Message m) {
+ super();
+ inMessage = m;
+ }
+
+ // prepair the message and get the send out message
+ private void commitOutputMessage() throws IOException {
+
+ //setup the reply message
+ final String replyToUri = getReplyToDestination(inMessage);
+
+ base.client.send(replyToUri, new Processor<Exchange>() {
+ public void onExchange(Exchange reply) {
+ base.marshal(currentStream.toString(), replyToUri, reply);
+
+ setReplyCorrelationID(inMessage, reply);
+
+ base.setMessageProperties(inMessage, reply);
+
+ getLogger().log(Level.FINE, "just server sending reply: ",
reply);
+ }
+ });
+ }
+
+ @Override
+ protected void doFlush() throws IOException {
+ // Do nothing here
+ }
+
+ @Override
+ protected void doClose() throws IOException {
+ commitOutputMessage();
+ }
+
+ @Override
+ protected void onWrite() throws IOException {
+ // Do nothing here
+ }
+ }
+
+ protected String getReplyToDestination(Message inMessage) {
+ if (inMessage.get(CamelConstants.CAMEL_REBASED_REPLY_TO) != null) {
+ return (String)
inMessage.get(CamelConstants.CAMEL_REBASED_REPLY_TO);
+ }
+ else {
+ return base.getReplyDestination();
+ }
+ }
+
+ protected void setReplyCorrelationID(Message inMessage, Exchange reply) {
+ Object value = inMessage.get(CamelConstants.CAMEL_CORRELATION_ID);
+ if (value != null) {
+ reply.getIn().setHeader(CamelConstants.CAMEL_CORRELATION_ID,
value);
+ }
+ }
+}
Propchange:
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelDestination.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelDestination.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange:
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelDestination.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransport.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransport.java?view=auto&rev=522906
==============================================================================
---
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransport.java
(added)
+++
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransport.java
Tue Mar 27 06:41:09 2007
@@ -0,0 +1,24 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf.transport;
+
+/**
+ * @version $Revision$
+ */
+public class CamelTransport {
+}
Propchange:
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransport.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransport.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange:
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransport.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportBase.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportBase.java?view=auto&rev=522906
==============================================================================
---
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportBase.java
(added)
+++
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportBase.java
Tue Mar 27 06:41:09 2007
@@ -0,0 +1,185 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf.transport;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.CamelContext;
+import org.apache.camel.util.CamelClient;
+import org.apache.cxf.Bus;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.service.model.EndpointInfo;
+
+/**
+ * @version $Revision$
+ */
+public class CamelTransportBase {
+ private String replyDestination;
+ CamelClient<Exchange> client;
+ private final CamelContext camelContext;
+ Bus bus;
+ EndpointInfo endpointInfo;
+
+ public CamelTransportBase(CamelContext camelContext, Bus bus, EndpointInfo
endpointInfo, boolean b, String baseBeanNameSuffix) {
+ this.camelContext = camelContext;
+ this.bus = bus;
+ this.endpointInfo = endpointInfo;
+ this.client = new CamelClient<Exchange>(camelContext);
+ }
+
+ public void populateIncomingContext(Exchange exchange, MessageImpl
inMessage, String jmsServerRequestHeaders) {
+
+ }
+
+
+ public String getReplyDestination() {
+ return replyDestination;
+ }
+
+ public void setMessageProperties(Message inMessage, Exchange reply) {
+
+ }
+
+ public void close() {
+ if (client != null) {
+ try {
+ client.stop();
+ }
+ catch (Exception e) {
+ // do nothing?
+ // TODO
+ }
+ }
+ }
+
+ /**
+ * Populates a Camel exchange with a payload
+ *
+ * @param payload the message payload, expected to be either of type
+ * String or byte[] depending on payload type
+ * @param replyTo the ReplyTo destination if any
+ * @param exchange the underlying exchange to marshal to
+ */
+ protected void marshal(Object payload, String replyTo, Exchange
exchange) {
+ org.apache.camel.Message message = exchange.getIn();
+ message.setBody(payload);
+ if (replyTo != null) {
+ message.setHeader(CamelConstants.CAMEL_CORRELATION_ID, replyTo);
+ }
+
+ }
+
+ /**
+ * Unmarshal the payload of an incoming message.
+ */
+ public byte[] unmarshal(Exchange exchange) {
+ return exchange.getIn().getBody(byte[].class);
+ }
+
+ /*
+ protected JMSMessageHeadersType populateIncomingContext(javax.jms.Message
message,
+
org.apache.cxf.message.Message inMessage,
+ String headerType)
throws JMSException {
+ JMSMessageHeadersType headers = null;
+
+ headers = (JMSMessageHeadersType)inMessage.get(headerType);
+
+ if (headers == null) {
+ headers = new JMSMessageHeadersType();
+ inMessage.put(headerType, headers);
+ }
+
+ headers.setJMSCorrelationID(message.getJMSCorrelationID());
+ headers.setJMSDeliveryMode(new Integer(message.getJMSDeliveryMode()));
+ headers.setJMSExpiration(new Long(message.getJMSExpiration()));
+ headers.setJMSMessageID(message.getJMSMessageID());
+ headers.setJMSPriority(new Integer(message.getJMSPriority()));
+
headers.setJMSRedelivered(Boolean.valueOf(message.getJMSRedelivered()));
+ headers.setJMSTimeStamp(new Long(message.getJMSTimestamp()));
+ headers.setJMSType(message.getJMSType());
+
+ List<JMSPropertyType> props = headers.getProperty();
+ Enumeration enm = message.getPropertyNames();
+ while (enm.hasMoreElements()) {
+ String name = (String)enm.nextElement();
+ String val = message.getStringProperty(name);
+ JMSPropertyType prop = new JMSPropertyType();
+ prop.setName(name);
+ prop.setValue(val);
+ props.add(prop);
+ }
+
+ return headers;
+ }
+
+ protected int getJMSDeliveryMode(JMSMessageHeadersType headers) {
+ int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
+
+ if (headers != null && headers.isSetJMSDeliveryMode()) {
+ deliveryMode = headers.getJMSDeliveryMode();
+ }
+ return deliveryMode;
+ }
+
+ protected int getJMSPriority(JMSMessageHeadersType headers) {
+ int priority = Message.DEFAULT_PRIORITY;
+ if (headers != null && headers.isSetJMSPriority()) {
+ priority = headers.getJMSPriority();
+ }
+ return priority;
+ }
+
+ protected long getTimeToLive(JMSMessageHeadersType headers) {
+ long ttl = -1;
+ if (headers != null && headers.isSetTimeToLive()) {
+ ttl = headers.getTimeToLive();
+ }
+ return ttl;
+ }
+
+ protected String getCorrelationId(JMSMessageHeadersType headers) {
+ String correlationId = null;
+ if (headers != null
+ && headers.isSetJMSCorrelationID()) {
+ correlationId = headers.getJMSCorrelationID();
+ }
+ return correlationId;
+ }
+
+
+ protected String getAddrUriFromJMSAddrPolicy() {
+ AddressType jmsAddressPolicy = transport.getJMSAddress();
+ return "jms:" + jmsAddressPolicy.getJndiConnectionFactoryName()
+ + "#"
+ + jmsAddressPolicy.getJndiDestinationName();
+ }
+
+ protected String getReplyTotAddrUriFromJMSAddrPolicy() {
+ AddressType jmsAddressPolicy = transport.getJMSAddress();
+ return "jms:"
+ + jmsAddressPolicy.getJndiConnectionFactoryName()
+ + "#"
+ + jmsAddressPolicy.getJndiReplyDestinationName();
+ }
+
+ protected boolean isDestinationStyleQueue() {
+ return JMSConstants.CAMEL_QUEUE.equals(
+ transport.getJMSAddress().getDestinationStyle().value());
+ }
+ */
+}
Propchange:
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportBase.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportBase.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange:
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportBase.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportFactory.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportFactory.java?view=auto&rev=522906
==============================================================================
---
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportFactory.java
(added)
+++
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportFactory.java
Tue Mar 27 06:41:09 2007
@@ -0,0 +1,88 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf.transport;
+
+import org.apache.camel.CamelContext;
+import org.apache.cxf.Bus;
+import org.apache.cxf.configuration.Configurer;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.AbstractTransportFactory;
+import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.transport.ConduitInitiator;
+import org.apache.cxf.transport.Destination;
+import org.apache.cxf.transport.DestinationFactory;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+
+import javax.annotation.Resource;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * @version $Revision$
+ */
+public class CamelTransportFactory extends AbstractTransportFactory implements
ConduitInitiator, DestinationFactory {
+ private static final Set<String> URI_PREFIXES = new HashSet<String>();
+
+ static {
+ URI_PREFIXES.add("camel://");
+ }
+
+ private Bus bus;
+ private CamelContext camelContext;
+
+ @Resource
+ public void setBus(Bus b) {
+ bus = b;
+ }
+
+ public Bus getBus() {
+ return bus;
+ }
+
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Resource
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ public Conduit getConduit(EndpointInfo targetInfo) throws IOException {
+ return getConduit(targetInfo, null);
+ }
+
+ public Conduit getConduit(EndpointInfo endpointInfo, EndpointReferenceType
target) throws IOException {
+ return new CamelConduit(camelContext, bus, endpointInfo, target);
+ }
+
+ public Destination getDestination(EndpointInfo endpointInfo) throws
IOException {
+ CamelDestination destination = new CamelDestination(camelContext, bus,
this, endpointInfo);
+ Configurer configurer = bus.getExtension(Configurer.class);
+ if (null != configurer) {
+ configurer.configureBean(destination);
+ }
+ return destination;
+ }
+
+ public Set<String> getUriPrefixes() {
+ return URI_PREFIXES;
+ }
+}
+
Propchange:
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportFactory.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange:
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportFactory.java
------------------------------------------------------------------------------
svn:mime-type = text/plain