Amila, The classes in org.apache.axis2.transport.base.datagram should not refer to SocketAddress, since this is something specific to UDP. Can you please think about a better solution?
Andreas On Fri, Dec 4, 2009 at 09:33, <ami...@apache.org> wrote: > Author: amilas > Date: Fri Dec 4 08:32:41 2009 > New Revision: 887108 > > URL: http://svn.apache.org/viewvc?rev=887108&view=rev > Log: > apply the patch for WSCOMMONS-511 > > Added: > > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramOutTransportInfo.java > Modified: > > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java > > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramDispatcherCallback.java > > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/ProcessPacketTask.java > > webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java > > webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPOutTransportInfo.java > > webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java > > Modified: > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java > URL: > http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java?rev=887108&r1=887107&r2=887108&view=diff > ============================================================================== > --- > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java > (original) > +++ > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java > Fri Dec 4 08:32:41 2009 > @@ -20,6 +20,8 @@ > > import java.io.IOException; > import java.net.SocketException; > +import java.net.SocketAddress; > +import java.nio.channels.DatagramChannel; > > import org.apache.axis2.AxisFault; > import org.apache.axis2.context.ConfigurationContext; > @@ -38,8 +40,12 @@ > > super.init(cfgCtx, transportIn); > DatagramDispatcherCallback callback = new > DatagramDispatcherCallback() { > - public void receive(DatagramEndpoint endpoint, byte[] data, int > length) { > - workerPool.execute(new ProcessPacketTask(endpoint, data, > length)); > + > + public void receive(SocketAddress address, > + DatagramEndpoint endpoint, > + byte[] data, > + int length) { > + workerPool.execute(new ProcessPacketTask(address, endpoint, > data, length)); > } > }; > try { > > Modified: > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramDispatcherCallback.java > URL: > http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramDispatcherCallback.java?rev=887108&r1=887107&r2=887108&view=diff > ============================================================================== > --- > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramDispatcherCallback.java > (original) > +++ > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramDispatcherCallback.java > Fri Dec 4 08:32:41 2009 > @@ -18,6 +18,12 @@ > */ > package org.apache.axis2.transport.base.datagram; > > +import java.nio.channels.DatagramChannel; > +import java.net.SocketAddress; > + > public interface DatagramDispatcherCallback { > - void receive(DatagramEndpoint endpoint, byte[] data, int length); > + void receive(SocketAddress address, > + DatagramEndpoint endpoint, > + byte[] data, > + int length); > } > > Added: > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramOutTransportInfo.java > URL: > http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramOutTransportInfo.java?rev=887108&view=auto > ============================================================================== > --- > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramOutTransportInfo.java > (added) > +++ > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramOutTransportInfo.java > Fri Dec 4 08:32:41 2009 > @@ -0,0 +1,44 @@ > +/* > + * Copyright 2004,2005 The Apache Software Foundation. > + * > + * Licensed under the Apache License, Version 2.0 (the "License"); > + * you may not use this file except in compliance with the License. > + * You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +package org.apache.axis2.transport.base.datagram; > + > +import org.apache.axis2.transport.OutTransportInfo; > + > +import java.nio.channels.DatagramChannel; > +import java.net.SocketAddress; > + > +public class DatagramOutTransportInfo implements OutTransportInfo { > + //out transport for back chanel > + protected SocketAddress sourceAddress; > + protected String contentType; > + > + public SocketAddress getSourceAddress() { > + return sourceAddress; > + } > + > + public void setSourceAddress(SocketAddress sourceAddress) { > + this.sourceAddress = sourceAddress; > + } > + > + public String getContentType() { > + return contentType; > + } > + > + public void setContentType(String contentType) { > + this.contentType = contentType; > + } > +} > > Modified: > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/ProcessPacketTask.java > URL: > http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/ProcessPacketTask.java?rev=887108&r1=887107&r2=887108&view=diff > ============================================================================== > --- > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/ProcessPacketTask.java > (original) > +++ > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/ProcessPacketTask.java > Fri Dec 4 08:32:41 2009 > @@ -20,6 +20,8 @@ > > import java.io.ByteArrayInputStream; > import java.io.InputStream; > +import java.nio.channels.DatagramChannel; > +import java.net.SocketAddress; > > import org.apache.axiom.soap.SOAPEnvelope; > import org.apache.axis2.context.MessageContext; > @@ -28,6 +30,7 @@ > import org.apache.commons.logging.Log; > import org.apache.commons.logging.LogFactory; > import org.apache.axis2.transport.base.MetricsCollector; > +import org.apache.axis2.Constants; > > /** > * Task encapsulating the processing of a datagram. > @@ -40,11 +43,21 @@ > private final DatagramEndpoint endpoint; > private final byte[] data; > private final int length; > + > + //back channel data > + private DatagramChannel datagramChannel; > + private SocketAddress address; > > - public ProcessPacketTask(DatagramEndpoint endpoint, byte[] data, int > length) { > + public ProcessPacketTask(SocketAddress address, > + DatagramEndpoint endpoint, > + byte[] data, > + int length) { > this.endpoint = endpoint; > this.data = data; > this.length = length; > + > + this.datagramChannel = datagramChannel; > + this.address = address; > } > > public void run() { > @@ -54,6 +67,14 @@ > MessageContext msgContext = endpoint.createMessageContext(); > SOAPEnvelope envelope = > TransportUtils.createSOAPMessage(msgContext, inputStream, > endpoint.getContentType()); > msgContext.setEnvelope(envelope); > + > + //create and out transport info object > + DatagramOutTransportInfo datagramOutTransportInfo = new > DatagramOutTransportInfo(); > + > datagramOutTransportInfo.setContentType(endpoint.getContentType()); > + datagramOutTransportInfo.setSourceAddress(address); > + > + msgContext.setProperty(Constants.OUT_TRANSPORT_INFO, > datagramOutTransportInfo); > + > AxisEngine.receive(msgContext); > metrics.incrementMessagesReceived(); > metrics.incrementBytesReceived(length); > > Modified: > webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java > URL: > http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java?rev=887108&r1=887107&r2=887108&view=diff > ============================================================================== > --- > webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java > (original) > +++ > webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java > Fri Dec 4 08:32:41 2009 > @@ -252,7 +252,7 @@ > if (log.isDebugEnabled()) { > log.debug("Received packet from " + address + " with length " > + length); > } > - callback.receive(endpoint, data, length); > + callback.receive(address, endpoint, data, length); > } catch (IOException ex) { > endpoint.getMetrics().incrementFaultsReceiving(); > log.error("Error receiving UDP packet", ex); > > Modified: > webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPOutTransportInfo.java > URL: > http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPOutTransportInfo.java?rev=887108&r1=887107&r2=887108&view=diff > ============================================================================== > --- > webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPOutTransportInfo.java > (original) > +++ > webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPOutTransportInfo.java > Fri Dec 4 08:32:41 2009 > @@ -23,15 +23,15 @@ > > import org.apache.axis2.AxisFault; > import org.apache.axis2.transport.OutTransportInfo; > +import org.apache.axis2.transport.base.datagram.DatagramOutTransportInfo; > > /** > * Holder of information to send an outgoing message to a UDP destination. > */ > -public class UDPOutTransportInfo implements OutTransportInfo { > +public class UDPOutTransportInfo extends DatagramOutTransportInfo { > private String host; > private int port; > - private String contentType; > - > + > public UDPOutTransportInfo(String eprString) throws AxisFault { > URI epr; > try { > @@ -65,12 +65,4 @@ > public void setPort(int port) { > this.port = port; > } > - > - public String getContentType() { > - return contentType; > - } > - > - public void setContentType(String contentType) { > - this.contentType = contentType; > - } > } > > Modified: > webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java > URL: > http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java?rev=887108&r1=887107&r2=887108&view=diff > ============================================================================== > --- > webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java > (original) > +++ > webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java > Fri Dec 4 08:32:41 2009 > @@ -19,22 +19,33 @@ > package org.apache.axis2.transport.udp; > > import java.io.IOException; > +import java.io.InputStream; > +import java.io.ByteArrayInputStream; > import java.net.DatagramPacket; > import java.net.DatagramSocket; > import java.net.InetAddress; > +import java.net.SocketAddress; > +import java.nio.channels.DatagramChannel; > +import java.nio.ByteBuffer; > > import org.apache.axiom.om.OMOutputFormat; > +import org.apache.axiom.soap.SOAPEnvelope; > import org.apache.axis2.AxisFault; > import org.apache.axis2.context.ConfigurationContext; > import org.apache.axis2.context.MessageContext; > import org.apache.axis2.description.TransportOutDescription; > +import org.apache.axis2.description.WSDL2Constants; > +import org.apache.axis2.description.OutInAxisOperation; > import org.apache.axis2.transport.MessageFormatter; > import org.apache.axis2.transport.OutTransportInfo; > import org.apache.axis2.transport.TransportUtils; > import org.apache.axis2.transport.base.AbstractTransportSender; > import org.apache.axis2.transport.base.BaseUtils; > +import org.apache.axis2.transport.base.datagram.DatagramOutTransportInfo; > import org.apache.commons.logging.LogFactory; > > +import javax.xml.stream.XMLStreamException; > + > /** > * Transport sender for the UDP protocol. > * > @@ -52,22 +63,72 @@ > > @Override > public void sendMessage(MessageContext msgContext, String targetEPR, > OutTransportInfo outTransportInfo) throws AxisFault { > - UDPOutTransportInfo udpOutInfo = new UDPOutTransportInfo(targetEPR); > - MessageFormatter messageFormatter = > TransportUtils.getMessageFormatter(msgContext); > - OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext); > - format.setContentType(udpOutInfo.getContentType()); > - byte[] payload = messageFormatter.getBytes(msgContext, format); > - try { > - DatagramSocket socket = new DatagramSocket(); > + if ((targetEPR == null) && (outTransportInfo != null)) { > + // this can happen only at the server side and send the message > using back chanel > + DatagramOutTransportInfo datagramOutTransportInfo = > (DatagramOutTransportInfo) outTransportInfo; > + MessageFormatter messageFormatter = > TransportUtils.getMessageFormatter(msgContext); > + OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext); > + format.setContentType(datagramOutTransportInfo.getContentType()); > + byte[] payload = messageFormatter.getBytes(msgContext, format); > + > + ByteBuffer byteBuffer = ByteBuffer.allocate(payload.length); > + byteBuffer.put(payload); > + > + DatagramSocket socket = null; > try { > - socket.send(new DatagramPacket(payload, payload.length, > InetAddress.getByName(udpOutInfo.getHost()), udpOutInfo.getPort())); > - } > - finally { > + socket = new DatagramSocket(); > + socket.send(new DatagramPacket(payload, payload.length, > datagramOutTransportInfo.getSourceAddress())); > + } catch (IOException e) { > + throw new AxisFault("Unable to send packet", e); > + } finally { > socket.close(); > } > + > + } else { > + UDPOutTransportInfo udpOutInfo = new > UDPOutTransportInfo(targetEPR); > + MessageFormatter messageFormatter = > TransportUtils.getMessageFormatter(msgContext); > + OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext); > + format.setContentType(udpOutInfo.getContentType()); > + byte[] payload = messageFormatter.getBytes(msgContext, format); > + try { > + DatagramSocket socket = new DatagramSocket(); > + try { > + socket.send(new DatagramPacket(payload, payload.length, > InetAddress.getByName(udpOutInfo.getHost()), udpOutInfo.getPort())); > + if (!msgContext.getOptions().isUseSeparateListener() && > !msgContext.isServerSide()){ > + waitForReply(msgContext, socket, > udpOutInfo.getContentType()); > + } > + } > + finally { > + socket.close(); > + } > + } > + catch (IOException ex) { > + throw new AxisFault("Unable to send packet", ex); > + } > + } > + } > + > + private void waitForReply(MessageContext messageContext, DatagramSocket > datagramSocket, String contentType) throws IOException { > + > + // piggy back message constant is used to pass a piggy back > + // message context in asnych model > + if (!(messageContext.getAxisOperation() instanceof > OutInAxisOperation) && > + > (messageContext.getProperty(org.apache.axis2.Constants.PIGGYBACK_MESSAGE) == > null)) { > + return; > } > - catch (IOException ex) { > - throw new AxisFault("Unable to send packet", ex); > + > + byte[] inputBuffer = new byte[4096]; //TODO set the maximum size > parameter > + DatagramPacket packet = new DatagramPacket(inputBuffer, > inputBuffer.length); > + datagramSocket.receive(packet); > + > + // create the soap envelope > + try { > + MessageContext respMessageContext = > messageContext.getOperationContext().getMessageContext(WSDL2Constants.MESSAGE_LABEL_IN); > + InputStream inputStream = new ByteArrayInputStream(inputBuffer, > 0, inputBuffer.length); > + SOAPEnvelope envelope = > TransportUtils.createSOAPMessage(respMessageContext, inputStream, > contentType); > + respMessageContext.setEnvelope(envelope); > + } catch (XMLStreamException e) { > + throw new AxisFault("Can not build the soap message ", e); > } > } > } > > >