Author: amilas Date: Fri Dec 4 08:32:41 2009 New Revision: 887108 URL: http://svn.apache.org/viewvc?rev=887108&view=rev Log: apply the patch for WSCOMMONS-511
Added: webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramOutTransportInfo.java Modified: webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramDispatcherCallback.java webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/ProcessPacketTask.java webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPOutTransportInfo.java webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java Modified: webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java?rev=887108&r1=887107&r2=887108&view=diff ============================================================================== --- webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java (original) +++ webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java Fri Dec 4 08:32:41 2009 @@ -20,6 +20,8 @@ import java.io.IOException; import java.net.SocketException; +import java.net.SocketAddress; +import java.nio.channels.DatagramChannel; import org.apache.axis2.AxisFault; import org.apache.axis2.context.ConfigurationContext; @@ -38,8 +40,12 @@ super.init(cfgCtx, transportIn); DatagramDispatcherCallback callback = new DatagramDispatcherCallback() { - public void receive(DatagramEndpoint endpoint, byte[] data, int length) { - workerPool.execute(new ProcessPacketTask(endpoint, data, length)); + + public void receive(SocketAddress address, + DatagramEndpoint endpoint, + byte[] data, + int length) { + workerPool.execute(new ProcessPacketTask(address, endpoint, data, length)); } }; try { Modified: webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramDispatcherCallback.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramDispatcherCallback.java?rev=887108&r1=887107&r2=887108&view=diff ============================================================================== --- webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramDispatcherCallback.java (original) +++ webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramDispatcherCallback.java Fri Dec 4 08:32:41 2009 @@ -18,6 +18,12 @@ */ package org.apache.axis2.transport.base.datagram; +import java.nio.channels.DatagramChannel; +import java.net.SocketAddress; + public interface DatagramDispatcherCallback { - void receive(DatagramEndpoint endpoint, byte[] data, int length); + void receive(SocketAddress address, + DatagramEndpoint endpoint, + byte[] data, + int length); } Added: webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramOutTransportInfo.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramOutTransportInfo.java?rev=887108&view=auto ============================================================================== --- webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramOutTransportInfo.java (added) +++ webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramOutTransportInfo.java Fri Dec 4 08:32:41 2009 @@ -0,0 +1,44 @@ +/* + * Copyright 2004,2005 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.axis2.transport.base.datagram; + +import org.apache.axis2.transport.OutTransportInfo; + +import java.nio.channels.DatagramChannel; +import java.net.SocketAddress; + +public class DatagramOutTransportInfo implements OutTransportInfo { + //out transport for back chanel + protected SocketAddress sourceAddress; + protected String contentType; + + public SocketAddress getSourceAddress() { + return sourceAddress; + } + + public void setSourceAddress(SocketAddress sourceAddress) { + this.sourceAddress = sourceAddress; + } + + public String getContentType() { + return contentType; + } + + public void setContentType(String contentType) { + this.contentType = contentType; + } +} Modified: webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/ProcessPacketTask.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/ProcessPacketTask.java?rev=887108&r1=887107&r2=887108&view=diff ============================================================================== --- webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/ProcessPacketTask.java (original) +++ webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/ProcessPacketTask.java Fri Dec 4 08:32:41 2009 @@ -20,6 +20,8 @@ import java.io.ByteArrayInputStream; import java.io.InputStream; +import java.nio.channels.DatagramChannel; +import java.net.SocketAddress; import org.apache.axiom.soap.SOAPEnvelope; import org.apache.axis2.context.MessageContext; @@ -28,6 +30,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.axis2.transport.base.MetricsCollector; +import org.apache.axis2.Constants; /** * Task encapsulating the processing of a datagram. @@ -40,11 +43,21 @@ private final DatagramEndpoint endpoint; private final byte[] data; private final int length; + + //back channel data + private DatagramChannel datagramChannel; + private SocketAddress address; - public ProcessPacketTask(DatagramEndpoint endpoint, byte[] data, int length) { + public ProcessPacketTask(SocketAddress address, + DatagramEndpoint endpoint, + byte[] data, + int length) { this.endpoint = endpoint; this.data = data; this.length = length; + + this.datagramChannel = datagramChannel; + this.address = address; } public void run() { @@ -54,6 +67,14 @@ MessageContext msgContext = endpoint.createMessageContext(); SOAPEnvelope envelope = TransportUtils.createSOAPMessage(msgContext, inputStream, endpoint.getContentType()); msgContext.setEnvelope(envelope); + + //create and out transport info object + DatagramOutTransportInfo datagramOutTransportInfo = new DatagramOutTransportInfo(); + datagramOutTransportInfo.setContentType(endpoint.getContentType()); + datagramOutTransportInfo.setSourceAddress(address); + + msgContext.setProperty(Constants.OUT_TRANSPORT_INFO, datagramOutTransportInfo); + AxisEngine.receive(msgContext); metrics.incrementMessagesReceived(); metrics.incrementBytesReceived(length); Modified: webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java?rev=887108&r1=887107&r2=887108&view=diff ============================================================================== --- webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java (original) +++ webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java Fri Dec 4 08:32:41 2009 @@ -252,7 +252,7 @@ if (log.isDebugEnabled()) { log.debug("Received packet from " + address + " with length " + length); } - callback.receive(endpoint, data, length); + callback.receive(address, endpoint, data, length); } catch (IOException ex) { endpoint.getMetrics().incrementFaultsReceiving(); log.error("Error receiving UDP packet", ex); Modified: webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPOutTransportInfo.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPOutTransportInfo.java?rev=887108&r1=887107&r2=887108&view=diff ============================================================================== --- webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPOutTransportInfo.java (original) +++ webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPOutTransportInfo.java Fri Dec 4 08:32:41 2009 @@ -23,15 +23,15 @@ import org.apache.axis2.AxisFault; import org.apache.axis2.transport.OutTransportInfo; +import org.apache.axis2.transport.base.datagram.DatagramOutTransportInfo; /** * Holder of information to send an outgoing message to a UDP destination. */ -public class UDPOutTransportInfo implements OutTransportInfo { +public class UDPOutTransportInfo extends DatagramOutTransportInfo { private String host; private int port; - private String contentType; - + public UDPOutTransportInfo(String eprString) throws AxisFault { URI epr; try { @@ -65,12 +65,4 @@ public void setPort(int port) { this.port = port; } - - public String getContentType() { - return contentType; - } - - public void setContentType(String contentType) { - this.contentType = contentType; - } } Modified: webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java?rev=887108&r1=887107&r2=887108&view=diff ============================================================================== --- webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java (original) +++ webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java Fri Dec 4 08:32:41 2009 @@ -19,22 +19,33 @@ package org.apache.axis2.transport.udp; import java.io.IOException; +import java.io.InputStream; +import java.io.ByteArrayInputStream; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; +import java.net.SocketAddress; +import java.nio.channels.DatagramChannel; +import java.nio.ByteBuffer; import org.apache.axiom.om.OMOutputFormat; +import org.apache.axiom.soap.SOAPEnvelope; import org.apache.axis2.AxisFault; import org.apache.axis2.context.ConfigurationContext; import org.apache.axis2.context.MessageContext; import org.apache.axis2.description.TransportOutDescription; +import org.apache.axis2.description.WSDL2Constants; +import org.apache.axis2.description.OutInAxisOperation; import org.apache.axis2.transport.MessageFormatter; import org.apache.axis2.transport.OutTransportInfo; import org.apache.axis2.transport.TransportUtils; import org.apache.axis2.transport.base.AbstractTransportSender; import org.apache.axis2.transport.base.BaseUtils; +import org.apache.axis2.transport.base.datagram.DatagramOutTransportInfo; import org.apache.commons.logging.LogFactory; +import javax.xml.stream.XMLStreamException; + /** * Transport sender for the UDP protocol. * @@ -52,22 +63,72 @@ @Override public void sendMessage(MessageContext msgContext, String targetEPR, OutTransportInfo outTransportInfo) throws AxisFault { - UDPOutTransportInfo udpOutInfo = new UDPOutTransportInfo(targetEPR); - MessageFormatter messageFormatter = TransportUtils.getMessageFormatter(msgContext); - OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext); - format.setContentType(udpOutInfo.getContentType()); - byte[] payload = messageFormatter.getBytes(msgContext, format); - try { - DatagramSocket socket = new DatagramSocket(); + if ((targetEPR == null) && (outTransportInfo != null)) { + // this can happen only at the server side and send the message using back chanel + DatagramOutTransportInfo datagramOutTransportInfo = (DatagramOutTransportInfo) outTransportInfo; + MessageFormatter messageFormatter = TransportUtils.getMessageFormatter(msgContext); + OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext); + format.setContentType(datagramOutTransportInfo.getContentType()); + byte[] payload = messageFormatter.getBytes(msgContext, format); + + ByteBuffer byteBuffer = ByteBuffer.allocate(payload.length); + byteBuffer.put(payload); + + DatagramSocket socket = null; try { - socket.send(new DatagramPacket(payload, payload.length, InetAddress.getByName(udpOutInfo.getHost()), udpOutInfo.getPort())); - } - finally { + socket = new DatagramSocket(); + socket.send(new DatagramPacket(payload, payload.length, datagramOutTransportInfo.getSourceAddress())); + } catch (IOException e) { + throw new AxisFault("Unable to send packet", e); + } finally { socket.close(); } + + } else { + UDPOutTransportInfo udpOutInfo = new UDPOutTransportInfo(targetEPR); + MessageFormatter messageFormatter = TransportUtils.getMessageFormatter(msgContext); + OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext); + format.setContentType(udpOutInfo.getContentType()); + byte[] payload = messageFormatter.getBytes(msgContext, format); + try { + DatagramSocket socket = new DatagramSocket(); + try { + socket.send(new DatagramPacket(payload, payload.length, InetAddress.getByName(udpOutInfo.getHost()), udpOutInfo.getPort())); + if (!msgContext.getOptions().isUseSeparateListener() && !msgContext.isServerSide()){ + waitForReply(msgContext, socket, udpOutInfo.getContentType()); + } + } + finally { + socket.close(); + } + } + catch (IOException ex) { + throw new AxisFault("Unable to send packet", ex); + } + } + } + + private void waitForReply(MessageContext messageContext, DatagramSocket datagramSocket, String contentType) throws IOException { + + // piggy back message constant is used to pass a piggy back + // message context in asnych model + if (!(messageContext.getAxisOperation() instanceof OutInAxisOperation) && + (messageContext.getProperty(org.apache.axis2.Constants.PIGGYBACK_MESSAGE) == null)) { + return; } - catch (IOException ex) { - throw new AxisFault("Unable to send packet", ex); + + byte[] inputBuffer = new byte[4096]; //TODO set the maximum size parameter + DatagramPacket packet = new DatagramPacket(inputBuffer, inputBuffer.length); + datagramSocket.receive(packet); + + // create the soap envelope + try { + MessageContext respMessageContext = messageContext.getOperationContext().getMessageContext(WSDL2Constants.MESSAGE_LABEL_IN); + InputStream inputStream = new ByteArrayInputStream(inputBuffer, 0, inputBuffer.length); + SOAPEnvelope envelope = TransportUtils.createSOAPMessage(respMessageContext, inputStream, contentType); + respMessageContext.setEnvelope(envelope); + } catch (XMLStreamException e) { + throw new AxisFault("Can not build the soap message ", e); } } }