Author: ffang
Date: Fri Oct 27 02:48:01 2006
New Revision: 468334
URL: http://svn.apache.org/viewvc?view=rev&rev=468334
Log:
[CXF-38] JBI Integration -- JBI transport implementation
Added:
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConduitOutputStream.java
(with props)
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConstants.java
(with props)
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIDestinationOutputStream.java
(with props)
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIMessageHelper.java
(with props)
Modified:
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConduit.java
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIDestination.java
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBITransportFactory.java
Modified:
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConduit.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConduit.java?view=diff&rev=468334&r1=468333&r2=468334
==============================================================================
---
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConduit.java
(original)
+++
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConduit.java
Fri Oct 27 02:48:01 2006
@@ -20,43 +20,68 @@
package org.apache.cxf.jbi.transport;
import java.io.IOException;
+import java.io.OutputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.jbi.messaging.DeliveryChannel;
+
+
+
+import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.message.Message;
import org.apache.cxf.transport.Conduit;
import org.apache.cxf.transport.Destination;
import org.apache.cxf.transport.MessageObserver;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
+
public class JBIConduit implements Conduit {
+
+ private static final Logger LOG = LogUtils.getL7dLogger(JBIConduit.class);
+
+ private MessageObserver incomingObserver;
+ private EndpointReferenceType target;
+ private DeliveryChannel channel;
+
+
+
+ public JBIConduit(EndpointReferenceType target, DeliveryChannel dc) {
+ this.target = target;
+ channel = dc;
+ }
public void send(Message message) throws IOException {
- // TODO Auto-generated method stub
-
+ LOG.log(Level.FINE, "JBIConduit send message");
+
+ message.setContent(OutputStream.class,
+ new JBIConduitOutputStream(message, channel,
target, this));
}
public void close(Message message) throws IOException {
- // TODO Auto-generated method stub
-
+ message.getContent(OutputStream.class).close();
}
public EndpointReferenceType getTarget() {
- // TODO Auto-generated method stub
- return null;
+ return target;
}
public Destination getBackChannel() {
- // TODO Auto-generated method stub
return null;
}
public void close() {
- // TODO Auto-generated method stub
-
+
}
public void setMessageObserver(MessageObserver observer) {
- // TODO Auto-generated method stub
-
+ incomingObserver = observer;
+ }
+
+ public MessageObserver getMessageObserver() {
+ return incomingObserver;
}
+
+
}
Added:
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConduitOutputStream.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConduitOutputStream.java?view=auto&rev=468334
==============================================================================
---
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConduitOutputStream.java
(added)
+++
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConduitOutputStream.java
Fri Oct 27 02:48:01 2006
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jbi.transport;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Method;
+import java.util.logging.Logger;
+
+import javax.jbi.messaging.DeliveryChannel;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessageExchangeFactory;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jws.WebService;
+import javax.xml.namespace.QName;
+import javax.xml.transform.Source;
+import javax.xml.transform.stream.StreamSource;
+
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.io.AbstractCachedOutputStream;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.cxf.wsdl.EndpointReferenceUtils;
+
+public class JBIConduitOutputStream extends AbstractCachedOutputStream {
+
+ private static final Logger LOG =
LogUtils.getL7dLogger(JBIConduitOutputStream.class);
+
+ private Message message;
+ private boolean isOneWay;
+ private DeliveryChannel channel;
+ private JBIConduit conduit;
+ private EndpointReferenceType target;
+
+ public JBIConduitOutputStream(Message m, DeliveryChannel channel,
EndpointReferenceType target,
+ JBIConduit conduit) {
+ message = m;
+ this.channel = channel;
+ this.conduit = conduit;
+ this.target = target;
+ }
+
+ @Override
+ protected void doFlush() throws IOException {
+
+ }
+
+ @Override
+ protected void doClose() throws IOException {
+ isOneWay = message.getExchange().isOneWay();
+ commitOutputMessage();
+
+ }
+
+ private void commitOutputMessage() throws IOException {
+ try {
+ Method targetMethod = (Method)message.get(Method.class.getName());
+ Class<?> clz = targetMethod.getDeclaringClass();
+
+ LOG.info(new org.apache.cxf.common.i18n.Message("INVOKE.SERVICE",
LOG).toString() + clz);
+
+ WebService ws = clz.getAnnotation(WebService.class);
+ assert ws != null;
+ QName interfaceName = new QName(ws.targetNamespace(), ws.name());
+ QName serviceName = EndpointReferenceUtils.getServiceName(target);
+ MessageExchangeFactory factory =
channel.createExchangeFactoryForService(serviceName);
+ LOG.info(new
org.apache.cxf.common.i18n.Message("CREATE.MESSAGE.EXCHANGE", LOG).toString()
+ + serviceName);
+ MessageExchange xchng = null;
+ if (isOneWay) {
+ xchng = factory.createInOnlyExchange();
+ } else {
+ xchng = factory.createInOutExchange();
+ }
+
+ NormalizedMessage inMsg = xchng.createMessage();
+ LOG.info(new
org.apache.cxf.common.i18n.Message("EXCHANGE.ENDPOINT", LOG).toString()
+ + xchng.getEndpoint());
+
+ InputStream ins = null;
+
+ if (inMsg != null) {
+ LOG.info("setup message contents on " + inMsg);
+ inMsg.setContent(getMessageContent(message));
+ xchng.setService(serviceName);
+ LOG.info("service for exchange " + serviceName);
+
+ xchng.setInterfaceName(interfaceName);
+
+ xchng.setOperation(new QName(targetMethod.getName()));
+ if (isOneWay) {
+ ((InOnly)xchng).setInMessage(inMsg);
+ } else {
+ ((InOut)xchng).setInMessage(inMsg);
+ }
+ LOG.info("sending message");
+ if (!isOneWay) {
+
+ channel.sendSync(xchng);
+ NormalizedMessage outMsg = ((InOut)xchng).getOutMessage();
+ ins =
JBIMessageHelper.convertMessageToInputStream(outMsg.getContent());
+ if (ins == null) {
+ throw new IOException(new
org.apache.cxf.common.i18n.Message(
+ "UNABLE.RETRIEVE.MESSAGE", LOG).toString());
+ }
+ Message inMessage = new MessageImpl();
+ message.getExchange().setInMessage(inMessage);
+ inMessage.setContent(InputStream.class, ins);
+ conduit.getMessageObserver().onMessage(inMessage);
+
+ } else {
+ channel.send(xchng);
+ }
+
+ } else {
+ LOG.info(new org.apache.cxf.common.i18n.Message("NO.MESSAGE",
LOG).toString());
+ }
+
+
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new IOException(e.toString());
+ }
+ }
+
+ private Source getMessageContent(Message message2) {
+ ByteArrayOutputStream bos = (ByteArrayOutputStream)getOut();
+ return new StreamSource(new ByteArrayInputStream(bos.toByteArray()));
+
+ }
+
+ @Override
+ protected void onWrite() throws IOException {
+
+ }
+
+}
Propchange:
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConduitOutputStream.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConduitOutputStream.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConstants.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConstants.java?view=auto&rev=468334
==============================================================================
---
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConstants.java
(added)
+++
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConstants.java
Fri Oct 27 02:48:01 2006
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jbi.transport;
+
+public final class JBIConstants {
+ public static final String MESSAGE_EXCHANGE_PROPERTY =
"celtix.jbi.message.exchange";
+}
Propchange:
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConstants.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConstants.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIDestination.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIDestination.java?view=diff&rev=468334&r1=468333&r2=468334
==============================================================================
---
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIDestination.java
(original)
+++
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIDestination.java
Fri Oct 27 02:48:01 2006
@@ -20,34 +20,252 @@
package org.apache.cxf.jbi.transport;
import java.io.IOException;
-
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.jbi.messaging.DeliveryChannel;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.servicedesc.ServiceEndpoint;
+import javax.xml.namespace.QName;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.jbi.se.CXFServiceUnit;
+import org.apache.cxf.jbi.se.CXFServiceUnitManager;
import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.transport.ConduitInitiator;
import org.apache.cxf.transport.Destination;
import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.ws.addressing.AttributedURIType;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
public class JBIDestination implements Destination {
+ private static final Logger LOG =
LogUtils.getL7dLogger(JBIDestination.class);
+ private final DeliveryChannel channel;
+ private final CXFServiceUnitManager suManager;
+ private ConduitInitiator conduitInitiator;
+ private EndpointInfo endpointInfo;
+ private EndpointReferenceType reference;
+ private MessageObserver incomingObserver;
+ private JBIDispatcher dispatcher;
+ private volatile boolean running;
+
+ public JBIDestination(ConduitInitiator ci,
+ EndpointInfo info,
+ DeliveryChannel dc,
+ CXFServiceUnitManager sum) {
+ this.conduitInitiator = ci;
+ this.endpointInfo = info;
+ this.channel = dc;
+ this.suManager = sum;
+ reference = new EndpointReferenceType();
+ AttributedURIType address = new AttributedURIType();
+ address.setValue(endpointInfo.getAddress());
+ reference.setAddress(address);
+ }
+
public EndpointReferenceType getAddress() {
- // TODO Auto-generated method stub
- return null;
+ return reference;
}
public Conduit getBackChannel(Message inMessage, Message partialResponse,
EndpointReferenceType address)
throws IOException {
- // TODO Auto-generated method stub
- return null;
+ Conduit backChannel = null;
+ if (address == null) {
+ backChannel = new BackChannelConduit(address, inMessage, this);
+ } else {
+ if (partialResponse != null) {
+ // just send back the partialResponse
+ backChannel = new BackChannelConduit(address, inMessage ,
this);
+ } else {
+ backChannel = conduitInitiator.getConduit(endpointInfo,
address);
+ // ensure decoupled back channel input stream is closed
+ backChannel.setMessageObserver(new MessageObserver() {
+ public void onMessage(Message m) {
+ //need to set up the headers
+ if (m.getContentFormats().contains(InputStream.class))
{
+ InputStream is = m.getContent(InputStream.class);
+ try {
+ is.close();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+ });
+ }
+ }
+ return backChannel;
+
}
public void shutdown() {
- // TODO Auto-generated method stub
-
+ running = false;
}
public void setMessageObserver(MessageObserver observer) {
- // TODO Auto-generated method stub
+ if (null != observer) {
+ try {
+ activate();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ } else {
+ LOG.log(Level.FINE, "JBIDestination shutdown()");
+ try {
+ deactivate();
+ } catch (IOException e) {
+ //Ignore for now.
+ }
+ }
+ incomingObserver = observer;
}
+ private void deactivate() throws IOException {
+ running = false;
+ }
+
+ private void activate() throws IOException {
+ LOG.info(new org.apache.cxf.common.i18n.Message(
+ "ACTIVE.JBI.SERVER.TRANSPORT", LOG).toString());
+ dispatcher = new JBIDispatcher();
+ new Thread(dispatcher).start();
+ }
+
+ // this should deal with the cxf message
+ protected class BackChannelConduit implements Conduit {
+
+ protected Message inMessage;
+ protected EndpointReferenceType target;
+ protected JBIDestination jbiDestination;
+
+ BackChannelConduit(EndpointReferenceType ref, Message message,
JBIDestination dest) {
+ inMessage = message;
+ target = ref;
+ jbiDestination = dest;
+ }
+
+ public void close(Message msg) throws IOException {
+ msg.getContent(OutputStream.class).close();
+ }
+
+ /**
+ * Register a message observer for incoming messages.
+ *
+ * @param observer the observer to notify on receipt of incoming
+ */
+ public void setMessageObserver(MessageObserver observer) {
+ // shouldn't be called for a back channel conduit
+ }
+
+ /**
+ * Send an outbound message, assumed to contain all the name-value
+ * mappings of the corresponding input message (if any).
+ *
+ * @param message the message to be sent.
+ */
+ public void send(Message message) throws IOException {
+ // setup the message to be send back
+ message.put(JBIConstants.MESSAGE_EXCHANGE_PROPERTY,
+ inMessage.get(JBIConstants.MESSAGE_EXCHANGE_PROPERTY));
+ message.setContent(OutputStream.class,
+ new JBIDestinationOutputStream(inMessage,
channel));
+ }
+
+ /**
+ * @return the reference associated with the target Destination
+ */
+ public EndpointReferenceType getTarget() {
+ return target;
+ }
+
+ /**
+ * Retreive the back-channel Destination.
+ *
+ * @return the backchannel Destination (or null if the backchannel is
+ * built-in)
+ */
+ public Destination getBackChannel() {
+ return null;
+ }
+
+ /**
+ * Close the conduit
+ */
+ public void close() {
+ }
+ }
+
+ private class JBIDispatcher implements Runnable {
+
+ public final void run() {
+
+ try {
+ running = true;
+ LOG.info(new org.apache.cxf.common.i18n.Message(
+ "RECEIVE.THREAD.START", LOG).toString());
+ do {
+ MessageExchange exchange = channel.accept();
+ if (exchange != null) {
+ // REVISIT: serialized message handling not such a
+ // good idea.
+ // REVISIT: can there be more than one ep?
+ ServiceEndpoint ep = exchange.getEndpoint();
+ CXFServiceUnit csu =
suManager.getServiceUnitForEndpoint(ep);
+ ClassLoader oldLoader =
Thread.currentThread().getContextClassLoader();
+
+ try {
+
Thread.currentThread().setContextClassLoader(csu.getClassLoader());
+ if (csu != null) {
+ LOG.info(new
org.apache.cxf.common.i18n.Message(
+ "DISPATCH.TO.SU", LOG).toString());
+ dispatch(exchange);
+ } else {
+ LOG.info(new
org.apache.cxf.common.i18n.Message(
+ "NO.SU.FOUND", LOG).toString());
+ }
+ } finally {
+
Thread.currentThread().setContextClassLoader(oldLoader);
+ }
+ }
+ } while(running);
+ } catch (Exception ex) {
+ LOG.log(Level.SEVERE, new org.apache.cxf.common.i18n.Message(
+ "ERROR.DISPATCH.THREAD", LOG).toString(), ex);
+ }
+ LOG.fine(new org.apache.cxf.common.i18n.Message(
+ "JBI.SERVER.TRANSPORT.MESSAGE.PROCESS.THREAD.EXIT",
LOG).toString());
+ }
+
+ }
+
+ private void dispatch(MessageExchange exchange) throws IOException {
+ QName opName = exchange.getOperation();
+ LOG.fine("dispatch method: " + opName);
+
+ NormalizedMessage nm = exchange.getMessage("in");
+ try {
+ final InputStream in =
JBIMessageHelper.convertMessageToInputStream(nm.getContent());
+ //get the message to be interceptor
+ MessageImpl inMessage = new MessageImpl();
+ inMessage.put(JBIConstants.MESSAGE_EXCHANGE_PROPERTY, exchange);
+ inMessage.setContent(InputStream.class, in);
+
+ inMessage.setDestination(this);
+ //handle the incoming message
+ incomingObserver.onMessage(inMessage);
+ } catch (Exception ex) {
+ LOG.log(Level.SEVERE, new org.apache.cxf.common.i18n.Message(
+ "ERROR.PREPARE.MESSAGE", LOG).toString(), ex);
+ throw new IOException(ex.getMessage());
+ }
+
+ }
}
Added:
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIDestinationOutputStream.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIDestinationOutputStream.java?view=auto&rev=468334
==============================================================================
---
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIDestinationOutputStream.java
(added)
+++
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIDestinationOutputStream.java
Fri Oct 27 02:48:01 2006
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jbi.transport;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.jbi.messaging.DeliveryChannel;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.transform.dom.DOMSource;
+
+import org.w3c.dom.Document;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.io.AbstractCachedOutputStream;
+import org.apache.cxf.message.Message;
+
+public class JBIDestinationOutputStream extends AbstractCachedOutputStream {
+
+ private static final Logger LOG =
LogUtils.getL7dLogger(JBIDestinationOutputStream.class);
+ private Message inMessage;
+ private DeliveryChannel channel;
+
+ public JBIDestinationOutputStream(Message m,
+ DeliveryChannel dc) {
+ super();
+ inMessage = m;
+ channel = dc;
+ }
+
+ @Override
+ protected void doFlush() throws IOException {
+ // so far do nothing
+ }
+
+ @Override
+ protected void doClose() throws IOException {
+ commitOutputMessage();
+ }
+
+ @Override
+ protected void onWrite() throws IOException {
+ // so far do nothing
+ }
+
+ private void commitOutputMessage() throws IOException {
+ try {
+ if (inMessage.getExchange().isOneWay()) {
+ return;
+ } else {
+
+ ByteArrayOutputStream baos = (ByteArrayOutputStream)getOut();
+ ByteArrayInputStream bais = new
ByteArrayInputStream(baos.toByteArray());
+ LOG.finest(new org.apache.cxf.common.i18n.Message(
+ "BUILDING.DOCUMENT", LOG).toString());
+ DocumentBuilderFactory docBuilderFactory =
DocumentBuilderFactory.newInstance();
+ DocumentBuilder builder =
docBuilderFactory.newDocumentBuilder();
+ Document doc = builder.parse(bais);
+
+ MessageExchange xchng = (MessageExchange)inMessage.get(
+ JBIConstants.MESSAGE_EXCHANGE_PROPERTY);
+ LOG.fine(new org.apache.cxf.common.i18n.Message(
+ "CREATE.NORMALIZED.MESSAGE", LOG).toString());
+ NormalizedMessage msg = xchng.createMessage();
+ msg.setContent(new DOMSource(doc));
+ LOG.info("the message class is " + xchng.getClass().getName());
+ xchng.setMessage(msg, "out");
+ LOG.fine(new org.apache.cxf.common.i18n.Message(
+ "POST.DISPATCH", LOG).toString());
+ channel.send(xchng);
+ }
+ } catch (Exception ex) {
+ LOG.log(Level.SEVERE, new org.apache.cxf.common.i18n.Message(
+ "ERROR.SEND.MESSAGE", LOG).toString(), ex);
+ }
+ }
+
+}
Propchange:
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIDestinationOutputStream.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIDestinationOutputStream.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIMessageHelper.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIMessageHelper.java?view=auto&rev=468334
==============================================================================
---
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIMessageHelper.java
(added)
+++
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIMessageHelper.java
Fri Oct 27 02:48:01 2006
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jbi.transport;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.logging.Logger;
+
+import javax.xml.transform.Source;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerConfigurationException;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.stream.StreamResult;
+
+import org.apache.cxf.common.i18n.Message;
+import org.apache.cxf.common.logging.LogUtils;
+
+public final class JBIMessageHelper {
+
+ private static final Logger LOG =
LogUtils.getL7dLogger(JBIMessageHelper.class);
+
+ private static final TransformerFactory TRANSFORMER_FACTORY
+ = TransformerFactory.newInstance();
+
+ private JBIMessageHelper() {
+ // complete
+ }
+
+
+ public static InputStream convertMessageToInputStream(Source src)
+ throws IOException, TransformerConfigurationException,
TransformerException {
+
+ final Transformer transformer = TRANSFORMER_FACTORY.newTransformer();
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ StreamResult result = new StreamResult(baos);
+ transformer.transform(src, result);
+ LOG.finest(new Message("RECEIVED.MESSAGE", LOG) + new
String(baos.toByteArray()));
+
+ return new ByteArrayInputStream(baos.toByteArray());
+ }
+}
Propchange:
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIMessageHelper.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIMessageHelper.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBITransportFactory.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBITransportFactory.java?view=diff&rev=468334&r1=468333&r2=468334
==============================================================================
---
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBITransportFactory.java
(original)
+++
incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBITransportFactory.java
Fri Oct 27 02:48:01 2006
@@ -21,7 +21,15 @@
package org.apache.cxf.jbi.transport;
import java.io.IOException;
+import java.util.logging.Logger;
+import javax.annotation.Resource;
+import javax.jbi.messaging.DeliveryChannel;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.configuration.Configurer;
+import org.apache.cxf.jbi.se.CXFServiceUnitManager;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.AbstractTransportFactory;
import org.apache.cxf.transport.Conduit;
@@ -32,20 +40,65 @@
public class JBITransportFactory extends AbstractTransportFactory implements
ConduitInitiator,
DestinationFactory {
+
+ private static final Logger LOG =
LogUtils.getL7dLogger(JBITransportFactory.class);
+
+ private CXFServiceUnitManager suManager;
+ private DeliveryChannel deliveryChannel;
+ private Bus bus;
+
+ @Resource
+ public void setBus(Bus b) {
+ bus = b;
+ }
+
+ public Bus getBus() {
+ return bus;
+ }
+
+ public DeliveryChannel getDeliveryChannel() {
+ return deliveryChannel;
+ }
+
+ public void setDeliveryChannel(DeliveryChannel newDeliverychannel) {
+ LOG.fine(new org.apache.cxf.common.i18n.Message(
+ "CONFIG.DELIVERY.CHANNEL", LOG).toString() + newDeliverychannel);
+ deliveryChannel = newDeliverychannel;
+ }
+
+ public CXFServiceUnitManager getServiceUnitManager() {
+ return suManager;
+ }
+
+ public void setServiceUnitManager(CXFServiceUnitManager sum) {
+ if (sum == null) {
+ Thread.dumpStack();
+ }
+ LOG.fine(new org.apache.cxf.common.i18n.Message(
+ "CONFIG.SU.MANAGER", LOG).toString() + sum);
+ suManager = sum;
+ }
public Conduit getConduit(EndpointInfo targetInfo) throws IOException {
- // TODO Auto-generated method stub
- return null;
+ return getConduit(targetInfo, null);
}
- public Conduit getConduit(EndpointInfo localInfo, EndpointReferenceType
target) throws IOException {
- // TODO Auto-generated method stub
- return null;
+ public Conduit getConduit(EndpointInfo endpointInfo, EndpointReferenceType
target) throws IOException {
+ Conduit conduit = new JBIConduit(target, deliveryChannel);
+ Configurer configurer = bus.getExtension(Configurer.class);
+ if (null != configurer) {
+ configurer.configureBean(conduit);
+ }
+ return conduit;
}
public Destination getDestination(EndpointInfo ei) throws IOException {
- // TODO Auto-generated method stub
- return null;
+ JBIDestination destination = new JBIDestination(this, ei,
deliveryChannel, suManager);
+ Configurer configurer = bus.getExtension(Configurer.class);
+ if (null != configurer) {
+ configurer.configureBean(destination);
+ }
+ return destination;
}
}