On Tue, Dec 22, 2009 at 12:57 AM, Andreas Veithen <andreas.veit...@gmail.com > wrote:
> Amila, > > The classes in org.apache.axis2.transport.base.datagram should not > refer to SocketAddress, since this is something specific to UDP. Can > you please think about a better solution? > will see. In that case I think most of the stuff in base.datagram have to move to udp transport module. thanks, Amila. > > Andreas > > On Fri, Dec 4, 2009 at 09:33, <ami...@apache.org> wrote: > > Author: amilas > > Date: Fri Dec 4 08:32:41 2009 > > New Revision: 887108 > > > > URL: http://svn.apache.org/viewvc?rev=887108&view=rev > > Log: > > apply the patch for WSCOMMONS-511 > > > > Added: > > > > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramOutTransportInfo.java > > Modified: > > > > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java > > > > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramDispatcherCallback.java > > > > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/ProcessPacketTask.java > > > > webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java > > > > webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPOutTransportInfo.java > > > > webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java > > > > Modified: > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java > > URL: > http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java?rev=887108&r1=887107&r2=887108&view=diff > > > ============================================================================== > > --- > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java > (original) > > +++ > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java > Fri Dec 4 08:32:41 2009 > > @@ -20,6 +20,8 @@ > > > > import java.io.IOException; > > import java.net.SocketException; > > +import java.net.SocketAddress; > > +import java.nio.channels.DatagramChannel; > > > > import org.apache.axis2.AxisFault; > > import org.apache.axis2.context.ConfigurationContext; > > @@ -38,8 +40,12 @@ > > > > super.init(cfgCtx, transportIn); > > DatagramDispatcherCallback callback = new > DatagramDispatcherCallback() { > > - public void receive(DatagramEndpoint endpoint, byte[] data, > int length) { > > - workerPool.execute(new ProcessPacketTask(endpoint, data, > length)); > > + > > + public void receive(SocketAddress address, > > + DatagramEndpoint endpoint, > > + byte[] data, > > + int length) { > > + workerPool.execute(new ProcessPacketTask(address, > endpoint, data, length)); > > } > > }; > > try { > > > > Modified: > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramDispatcherCallback.java > > URL: > http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramDispatcherCallback.java?rev=887108&r1=887107&r2=887108&view=diff > > > ============================================================================== > > --- > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramDispatcherCallback.java > (original) > > +++ > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramDispatcherCallback.java > Fri Dec 4 08:32:41 2009 > > @@ -18,6 +18,12 @@ > > */ > > package org.apache.axis2.transport.base.datagram; > > > > +import java.nio.channels.DatagramChannel; > > +import java.net.SocketAddress; > > + > > public interface DatagramDispatcherCallback { > > - void receive(DatagramEndpoint endpoint, byte[] data, int length); > > + void receive(SocketAddress address, > > + DatagramEndpoint endpoint, > > + byte[] data, > > + int length); > > } > > > > Added: > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramOutTransportInfo.java > > URL: > http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramOutTransportInfo.java?rev=887108&view=auto > > > ============================================================================== > > --- > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramOutTransportInfo.java > (added) > > +++ > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramOutTransportInfo.java > Fri Dec 4 08:32:41 2009 > > @@ -0,0 +1,44 @@ > > +/* > > + * Copyright 2004,2005 The Apache Software Foundation. > > + * > > + * Licensed under the Apache License, Version 2.0 (the "License"); > > + * you may not use this file except in compliance with the License. > > + * You may obtain a copy of the License at > > + * > > + * http://www.apache.org/licenses/LICENSE-2.0 > > + * > > + * Unless required by applicable law or agreed to in writing, software > > + * distributed under the License is distributed on an "AS IS" BASIS, > > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > > + * See the License for the specific language governing permissions and > > + * limitations under the License. > > + */ > > + > > +package org.apache.axis2.transport.base.datagram; > > + > > +import org.apache.axis2.transport.OutTransportInfo; > > + > > +import java.nio.channels.DatagramChannel; > > +import java.net.SocketAddress; > > + > > +public class DatagramOutTransportInfo implements OutTransportInfo { > > + //out transport for back chanel > > + protected SocketAddress sourceAddress; > > + protected String contentType; > > + > > + public SocketAddress getSourceAddress() { > > + return sourceAddress; > > + } > > + > > + public void setSourceAddress(SocketAddress sourceAddress) { > > + this.sourceAddress = sourceAddress; > > + } > > + > > + public String getContentType() { > > + return contentType; > > + } > > + > > + public void setContentType(String contentType) { > > + this.contentType = contentType; > > + } > > +} > > > > Modified: > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/ProcessPacketTask.java > > URL: > http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/ProcessPacketTask.java?rev=887108&r1=887107&r2=887108&view=diff > > > ============================================================================== > > --- > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/ProcessPacketTask.java > (original) > > +++ > webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/ProcessPacketTask.java > Fri Dec 4 08:32:41 2009 > > @@ -20,6 +20,8 @@ > > > > import java.io.ByteArrayInputStream; > > import java.io.InputStream; > > +import java.nio.channels.DatagramChannel; > > +import java.net.SocketAddress; > > > > import org.apache.axiom.soap.SOAPEnvelope; > > import org.apache.axis2.context.MessageContext; > > @@ -28,6 +30,7 @@ > > import org.apache.commons.logging.Log; > > import org.apache.commons.logging.LogFactory; > > import org.apache.axis2.transport.base.MetricsCollector; > > +import org.apache.axis2.Constants; > > > > /** > > * Task encapsulating the processing of a datagram. > > @@ -40,11 +43,21 @@ > > private final DatagramEndpoint endpoint; > > private final byte[] data; > > private final int length; > > + > > + //back channel data > > + private DatagramChannel datagramChannel; > > + private SocketAddress address; > > > > - public ProcessPacketTask(DatagramEndpoint endpoint, byte[] data, int > length) { > > + public ProcessPacketTask(SocketAddress address, > > + DatagramEndpoint endpoint, > > + byte[] data, > > + int length) { > > this.endpoint = endpoint; > > this.data = data; > > this.length = length; > > + > > + this.datagramChannel = datagramChannel; > > + this.address = address; > > } > > > > public void run() { > > @@ -54,6 +67,14 @@ > > MessageContext msgContext = endpoint.createMessageContext(); > > SOAPEnvelope envelope = > TransportUtils.createSOAPMessage(msgContext, inputStream, > endpoint.getContentType()); > > msgContext.setEnvelope(envelope); > > + > > + //create and out transport info object > > + DatagramOutTransportInfo datagramOutTransportInfo = new > DatagramOutTransportInfo(); > > + > datagramOutTransportInfo.setContentType(endpoint.getContentType()); > > + datagramOutTransportInfo.setSourceAddress(address); > > + > > + msgContext.setProperty(Constants.OUT_TRANSPORT_INFO, > datagramOutTransportInfo); > > + > > AxisEngine.receive(msgContext); > > metrics.incrementMessagesReceived(); > > metrics.incrementBytesReceived(length); > > > > Modified: > webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java > > URL: > http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java?rev=887108&r1=887107&r2=887108&view=diff > > > ============================================================================== > > --- > webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java > (original) > > +++ > webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java > Fri Dec 4 08:32:41 2009 > > @@ -252,7 +252,7 @@ > > if (log.isDebugEnabled()) { > > log.debug("Received packet from " + address + " with > length " + length); > > } > > - callback.receive(endpoint, data, length); > > + callback.receive(address, endpoint, data, length); > > } catch (IOException ex) { > > endpoint.getMetrics().incrementFaultsReceiving(); > > log.error("Error receiving UDP packet", ex); > > > > Modified: > webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPOutTransportInfo.java > > URL: > http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPOutTransportInfo.java?rev=887108&r1=887107&r2=887108&view=diff > > > ============================================================================== > > --- > webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPOutTransportInfo.java > (original) > > +++ > webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPOutTransportInfo.java > Fri Dec 4 08:32:41 2009 > > @@ -23,15 +23,15 @@ > > > > import org.apache.axis2.AxisFault; > > import org.apache.axis2.transport.OutTransportInfo; > > +import > org.apache.axis2.transport.base.datagram.DatagramOutTransportInfo; > > > > /** > > * Holder of information to send an outgoing message to a UDP > destination. > > */ > > -public class UDPOutTransportInfo implements OutTransportInfo { > > +public class UDPOutTransportInfo extends DatagramOutTransportInfo { > > private String host; > > private int port; > > - private String contentType; > > - > > + > > public UDPOutTransportInfo(String eprString) throws AxisFault { > > URI epr; > > try { > > @@ -65,12 +65,4 @@ > > public void setPort(int port) { > > this.port = port; > > } > > - > > - public String getContentType() { > > - return contentType; > > - } > > - > > - public void setContentType(String contentType) { > > - this.contentType = contentType; > > - } > > } > > > > Modified: > webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java > > URL: > http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java?rev=887108&r1=887107&r2=887108&view=diff > > > ============================================================================== > > --- > webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java > (original) > > +++ > webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java > Fri Dec 4 08:32:41 2009 > > @@ -19,22 +19,33 @@ > > package org.apache.axis2.transport.udp; > > > > import java.io.IOException; > > +import java.io.InputStream; > > +import java.io.ByteArrayInputStream; > > import java.net.DatagramPacket; > > import java.net.DatagramSocket; > > import java.net.InetAddress; > > +import java.net.SocketAddress; > > +import java.nio.channels.DatagramChannel; > > +import java.nio.ByteBuffer; > > > > import org.apache.axiom.om.OMOutputFormat; > > +import org.apache.axiom.soap.SOAPEnvelope; > > import org.apache.axis2.AxisFault; > > import org.apache.axis2.context.ConfigurationContext; > > import org.apache.axis2.context.MessageContext; > > import org.apache.axis2.description.TransportOutDescription; > > +import org.apache.axis2.description.WSDL2Constants; > > +import org.apache.axis2.description.OutInAxisOperation; > > import org.apache.axis2.transport.MessageFormatter; > > import org.apache.axis2.transport.OutTransportInfo; > > import org.apache.axis2.transport.TransportUtils; > > import org.apache.axis2.transport.base.AbstractTransportSender; > > import org.apache.axis2.transport.base.BaseUtils; > > +import > org.apache.axis2.transport.base.datagram.DatagramOutTransportInfo; > > import org.apache.commons.logging.LogFactory; > > > > +import javax.xml.stream.XMLStreamException; > > + > > /** > > * Transport sender for the UDP protocol. > > * > > @@ -52,22 +63,72 @@ > > > > @Override > > public void sendMessage(MessageContext msgContext, String targetEPR, > OutTransportInfo outTransportInfo) throws AxisFault { > > - UDPOutTransportInfo udpOutInfo = new > UDPOutTransportInfo(targetEPR); > > - MessageFormatter messageFormatter = > TransportUtils.getMessageFormatter(msgContext); > > - OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext); > > - format.setContentType(udpOutInfo.getContentType()); > > - byte[] payload = messageFormatter.getBytes(msgContext, format); > > - try { > > - DatagramSocket socket = new DatagramSocket(); > > + if ((targetEPR == null) && (outTransportInfo != null)) { > > + // this can happen only at the server side and send the > message using back chanel > > + DatagramOutTransportInfo datagramOutTransportInfo = > (DatagramOutTransportInfo) outTransportInfo; > > + MessageFormatter messageFormatter = > TransportUtils.getMessageFormatter(msgContext); > > + OMOutputFormat format = > BaseUtils.getOMOutputFormat(msgContext); > > + > format.setContentType(datagramOutTransportInfo.getContentType()); > > + byte[] payload = messageFormatter.getBytes(msgContext, > format); > > + > > + ByteBuffer byteBuffer = ByteBuffer.allocate(payload.length); > > + byteBuffer.put(payload); > > + > > + DatagramSocket socket = null; > > try { > > - socket.send(new DatagramPacket(payload, payload.length, > InetAddress.getByName(udpOutInfo.getHost()), udpOutInfo.getPort())); > > - } > > - finally { > > + socket = new DatagramSocket(); > > + socket.send(new DatagramPacket(payload, payload.length, > datagramOutTransportInfo.getSourceAddress())); > > + } catch (IOException e) { > > + throw new AxisFault("Unable to send packet", e); > > + } finally { > > socket.close(); > > } > > + > > + } else { > > + UDPOutTransportInfo udpOutInfo = new > UDPOutTransportInfo(targetEPR); > > + MessageFormatter messageFormatter = > TransportUtils.getMessageFormatter(msgContext); > > + OMOutputFormat format = > BaseUtils.getOMOutputFormat(msgContext); > > + format.setContentType(udpOutInfo.getContentType()); > > + byte[] payload = messageFormatter.getBytes(msgContext, > format); > > + try { > > + DatagramSocket socket = new DatagramSocket(); > > + try { > > + socket.send(new DatagramPacket(payload, > payload.length, InetAddress.getByName(udpOutInfo.getHost()), > udpOutInfo.getPort())); > > + if (!msgContext.getOptions().isUseSeparateListener() > && !msgContext.isServerSide()){ > > + waitForReply(msgContext, socket, > udpOutInfo.getContentType()); > > + } > > + } > > + finally { > > + socket.close(); > > + } > > + } > > + catch (IOException ex) { > > + throw new AxisFault("Unable to send packet", ex); > > + } > > + } > > + } > > + > > + private void waitForReply(MessageContext messageContext, > DatagramSocket datagramSocket, String contentType) throws IOException { > > + > > + // piggy back message constant is used to pass a piggy back > > + // message context in asnych model > > + if (!(messageContext.getAxisOperation() instanceof > OutInAxisOperation) && > > + > (messageContext.getProperty(org.apache.axis2.Constants.PIGGYBACK_MESSAGE) > == null)) { > > + return; > > } > > - catch (IOException ex) { > > - throw new AxisFault("Unable to send packet", ex); > > + > > + byte[] inputBuffer = new byte[4096]; //TODO set the maximum size > parameter > > + DatagramPacket packet = new DatagramPacket(inputBuffer, > inputBuffer.length); > > + datagramSocket.receive(packet); > > + > > + // create the soap envelope > > + try { > > + MessageContext respMessageContext = > messageContext.getOperationContext().getMessageContext(WSDL2Constants.MESSAGE_LABEL_IN); > > + InputStream inputStream = new > ByteArrayInputStream(inputBuffer, 0, inputBuffer.length); > > + SOAPEnvelope envelope = > TransportUtils.createSOAPMessage(respMessageContext, inputStream, > contentType); > > + respMessageContext.setEnvelope(envelope); > > + } catch (XMLStreamException e) { > > + throw new AxisFault("Can not build the soap message ", e); > > } > > } > > } > > > > > > > -- Amila Suriarachchi WSO2 Inc. blog: http://amilachinthaka.blogspot.com/