Amila,

The classes in org.apache.axis2.transport.base.datagram should not
refer to SocketAddress, since this is something specific to UDP. Can
you please think about a better solution?

Andreas

On Fri, Dec 4, 2009 at 09:33,  <ami...@apache.org> wrote:
> Author: amilas
> Date: Fri Dec  4 08:32:41 2009
> New Revision: 887108
>
> URL: http://svn.apache.org/viewvc?rev=887108&view=rev
> Log:
> apply the patch for WSCOMMONS-511
>
> Added:
>    
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramOutTransportInfo.java
> Modified:
>    
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java
>    
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramDispatcherCallback.java
>    
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/ProcessPacketTask.java
>    
> webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java
>    
> webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPOutTransportInfo.java
>    
> webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java
>
> Modified: 
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java
> URL: 
> http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java?rev=887108&r1=887107&r2=887108&view=diff
> ==============================================================================
> --- 
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java
>  (original)
> +++ 
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java
>  Fri Dec  4 08:32:41 2009
> @@ -20,6 +20,8 @@
>
>  import java.io.IOException;
>  import java.net.SocketException;
> +import java.net.SocketAddress;
> +import java.nio.channels.DatagramChannel;
>
>  import org.apache.axis2.AxisFault;
>  import org.apache.axis2.context.ConfigurationContext;
> @@ -38,8 +40,12 @@
>
>         super.init(cfgCtx, transportIn);
>         DatagramDispatcherCallback callback = new 
> DatagramDispatcherCallback() {
> -            public void receive(DatagramEndpoint endpoint, byte[] data, int 
> length) {
> -                workerPool.execute(new ProcessPacketTask(endpoint, data, 
> length));
> +
> +            public void receive(SocketAddress address,
> +                                DatagramEndpoint endpoint,
> +                                byte[] data,
> +                                int length) {
> +                workerPool.execute(new ProcessPacketTask(address, endpoint, 
> data, length));
>             }
>         };
>         try {
>
> Modified: 
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramDispatcherCallback.java
> URL: 
> http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramDispatcherCallback.java?rev=887108&r1=887107&r2=887108&view=diff
> ==============================================================================
> --- 
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramDispatcherCallback.java
>  (original)
> +++ 
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramDispatcherCallback.java
>  Fri Dec  4 08:32:41 2009
> @@ -18,6 +18,12 @@
>  */
>  package org.apache.axis2.transport.base.datagram;
>
> +import java.nio.channels.DatagramChannel;
> +import java.net.SocketAddress;
> +
>  public interface DatagramDispatcherCallback {
> -    void receive(DatagramEndpoint endpoint, byte[] data, int length);
> +    void receive(SocketAddress address,
> +                 DatagramEndpoint endpoint,
> +                 byte[] data,
> +                 int length);
>  }
>
> Added: 
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramOutTransportInfo.java
> URL: 
> http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramOutTransportInfo.java?rev=887108&view=auto
> ==============================================================================
> --- 
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramOutTransportInfo.java
>  (added)
> +++ 
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramOutTransportInfo.java
>  Fri Dec  4 08:32:41 2009
> @@ -0,0 +1,44 @@
> +/*
> + * Copyright 2004,2005 The Apache Software Foundation.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.axis2.transport.base.datagram;
> +
> +import org.apache.axis2.transport.OutTransportInfo;
> +
> +import java.nio.channels.DatagramChannel;
> +import java.net.SocketAddress;
> +
> +public class DatagramOutTransportInfo implements OutTransportInfo {
> +    //out transport for back chanel
> +    protected SocketAddress sourceAddress;
> +    protected String contentType;
> +
> +    public SocketAddress getSourceAddress() {
> +        return sourceAddress;
> +    }
> +
> +    public void setSourceAddress(SocketAddress sourceAddress) {
> +        this.sourceAddress = sourceAddress;
> +    }
> +
> +    public String getContentType() {
> +        return contentType;
> +    }
> +
> +    public void setContentType(String contentType) {
> +        this.contentType = contentType;
> +    }
> +}
>
> Modified: 
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/ProcessPacketTask.java
> URL: 
> http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/ProcessPacketTask.java?rev=887108&r1=887107&r2=887108&view=diff
> ==============================================================================
> --- 
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/ProcessPacketTask.java
>  (original)
> +++ 
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/ProcessPacketTask.java
>  Fri Dec  4 08:32:41 2009
> @@ -20,6 +20,8 @@
>
>  import java.io.ByteArrayInputStream;
>  import java.io.InputStream;
> +import java.nio.channels.DatagramChannel;
> +import java.net.SocketAddress;
>
>  import org.apache.axiom.soap.SOAPEnvelope;
>  import org.apache.axis2.context.MessageContext;
> @@ -28,6 +30,7 @@
>  import org.apache.commons.logging.Log;
>  import org.apache.commons.logging.LogFactory;
>  import org.apache.axis2.transport.base.MetricsCollector;
> +import org.apache.axis2.Constants;
>
>  /**
>  * Task encapsulating the processing of a datagram.
> @@ -40,11 +43,21 @@
>     private final DatagramEndpoint endpoint;
>     private final byte[] data;
>     private final int length;
> +
> +    //back channel data
> +    private DatagramChannel datagramChannel;
> +    private SocketAddress address;
>
> -    public ProcessPacketTask(DatagramEndpoint endpoint, byte[] data, int 
> length) {
> +    public ProcessPacketTask(SocketAddress address,
> +                             DatagramEndpoint endpoint,
> +                             byte[] data,
> +                             int length) {
>         this.endpoint = endpoint;
>         this.data = data;
>         this.length = length;
> +
> +        this.datagramChannel = datagramChannel;
> +        this.address = address;
>     }
>
>     public void run() {
> @@ -54,6 +67,14 @@
>             MessageContext msgContext = endpoint.createMessageContext();
>             SOAPEnvelope envelope = 
> TransportUtils.createSOAPMessage(msgContext, inputStream, 
> endpoint.getContentType());
>             msgContext.setEnvelope(envelope);
> +
> +            //create and out transport info object
> +            DatagramOutTransportInfo datagramOutTransportInfo = new 
> DatagramOutTransportInfo();
> +            
> datagramOutTransportInfo.setContentType(endpoint.getContentType());
> +            datagramOutTransportInfo.setSourceAddress(address);
> +
> +            msgContext.setProperty(Constants.OUT_TRANSPORT_INFO, 
> datagramOutTransportInfo);
> +
>             AxisEngine.receive(msgContext);
>             metrics.incrementMessagesReceived();
>             metrics.incrementBytesReceived(length);
>
> Modified: 
> webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java
> URL: 
> http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java?rev=887108&r1=887107&r2=887108&view=diff
> ==============================================================================
> --- 
> webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java
>  (original)
> +++ 
> webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java
>  Fri Dec  4 08:32:41 2009
> @@ -252,7 +252,7 @@
>             if (log.isDebugEnabled()) {
>                 log.debug("Received packet from " + address + " with length " 
> + length);
>             }
> -            callback.receive(endpoint, data, length);
> +            callback.receive(address, endpoint, data, length);
>         } catch (IOException ex) {
>             endpoint.getMetrics().incrementFaultsReceiving();
>             log.error("Error receiving UDP packet", ex);
>
> Modified: 
> webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPOutTransportInfo.java
> URL: 
> http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPOutTransportInfo.java?rev=887108&r1=887107&r2=887108&view=diff
> ==============================================================================
> --- 
> webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPOutTransportInfo.java
>  (original)
> +++ 
> webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPOutTransportInfo.java
>  Fri Dec  4 08:32:41 2009
> @@ -23,15 +23,15 @@
>
>  import org.apache.axis2.AxisFault;
>  import org.apache.axis2.transport.OutTransportInfo;
> +import org.apache.axis2.transport.base.datagram.DatagramOutTransportInfo;
>
>  /**
>  * Holder of information to send an outgoing message to a UDP destination.
>  */
> -public class UDPOutTransportInfo implements OutTransportInfo {
> +public class UDPOutTransportInfo extends DatagramOutTransportInfo {
>     private String host;
>     private int port;
> -    private String contentType;
> -
> +
>     public UDPOutTransportInfo(String eprString) throws AxisFault {
>         URI epr;
>         try {
> @@ -65,12 +65,4 @@
>     public void setPort(int port) {
>         this.port = port;
>     }
> -
> -    public String getContentType() {
> -        return contentType;
> -    }
> -
> -    public void setContentType(String contentType) {
> -        this.contentType = contentType;
> -    }
>  }
>
> Modified: 
> webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java
> URL: 
> http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java?rev=887108&r1=887107&r2=887108&view=diff
> ==============================================================================
> --- 
> webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java
>  (original)
> +++ 
> webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java
>  Fri Dec  4 08:32:41 2009
> @@ -19,22 +19,33 @@
>  package org.apache.axis2.transport.udp;
>
>  import java.io.IOException;
> +import java.io.InputStream;
> +import java.io.ByteArrayInputStream;
>  import java.net.DatagramPacket;
>  import java.net.DatagramSocket;
>  import java.net.InetAddress;
> +import java.net.SocketAddress;
> +import java.nio.channels.DatagramChannel;
> +import java.nio.ByteBuffer;
>
>  import org.apache.axiom.om.OMOutputFormat;
> +import org.apache.axiom.soap.SOAPEnvelope;
>  import org.apache.axis2.AxisFault;
>  import org.apache.axis2.context.ConfigurationContext;
>  import org.apache.axis2.context.MessageContext;
>  import org.apache.axis2.description.TransportOutDescription;
> +import org.apache.axis2.description.WSDL2Constants;
> +import org.apache.axis2.description.OutInAxisOperation;
>  import org.apache.axis2.transport.MessageFormatter;
>  import org.apache.axis2.transport.OutTransportInfo;
>  import org.apache.axis2.transport.TransportUtils;
>  import org.apache.axis2.transport.base.AbstractTransportSender;
>  import org.apache.axis2.transport.base.BaseUtils;
> +import org.apache.axis2.transport.base.datagram.DatagramOutTransportInfo;
>  import org.apache.commons.logging.LogFactory;
>
> +import javax.xml.stream.XMLStreamException;
> +
>  /**
>  * Transport sender for the UDP protocol.
>  *
> @@ -52,22 +63,72 @@
>
>     @Override
>     public void sendMessage(MessageContext msgContext, String targetEPR, 
> OutTransportInfo outTransportInfo) throws AxisFault {
> -        UDPOutTransportInfo udpOutInfo = new UDPOutTransportInfo(targetEPR);
> -        MessageFormatter messageFormatter = 
> TransportUtils.getMessageFormatter(msgContext);
> -        OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext);
> -        format.setContentType(udpOutInfo.getContentType());
> -        byte[] payload = messageFormatter.getBytes(msgContext, format);
> -        try {
> -            DatagramSocket socket = new DatagramSocket();
> +        if ((targetEPR == null) && (outTransportInfo != null)) {
> +            // this can happen only at the server side and send the message 
> using back chanel
> +            DatagramOutTransportInfo datagramOutTransportInfo = 
> (DatagramOutTransportInfo) outTransportInfo;
> +            MessageFormatter messageFormatter = 
> TransportUtils.getMessageFormatter(msgContext);
> +            OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext);
> +            format.setContentType(datagramOutTransportInfo.getContentType());
> +            byte[] payload = messageFormatter.getBytes(msgContext, format);
> +
> +            ByteBuffer byteBuffer = ByteBuffer.allocate(payload.length);
> +            byteBuffer.put(payload);
> +
> +            DatagramSocket socket = null;
>             try {
> -                socket.send(new DatagramPacket(payload, payload.length, 
> InetAddress.getByName(udpOutInfo.getHost()), udpOutInfo.getPort()));
> -            }
> -            finally {
> +                socket = new DatagramSocket();
> +                socket.send(new DatagramPacket(payload, payload.length, 
> datagramOutTransportInfo.getSourceAddress()));
> +            } catch (IOException e) {
> +                throw new AxisFault("Unable to send packet", e);
> +            } finally {
>                 socket.close();
>             }
> +
> +        } else {
> +            UDPOutTransportInfo udpOutInfo = new 
> UDPOutTransportInfo(targetEPR);
> +            MessageFormatter messageFormatter = 
> TransportUtils.getMessageFormatter(msgContext);
> +            OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext);
> +            format.setContentType(udpOutInfo.getContentType());
> +            byte[] payload = messageFormatter.getBytes(msgContext, format);
> +            try {
> +                DatagramSocket socket = new DatagramSocket();
> +                try {
> +                    socket.send(new DatagramPacket(payload, payload.length, 
> InetAddress.getByName(udpOutInfo.getHost()), udpOutInfo.getPort()));
> +                    if (!msgContext.getOptions().isUseSeparateListener() && 
> !msgContext.isServerSide()){
> +                        waitForReply(msgContext, socket, 
> udpOutInfo.getContentType());
> +                    }
> +                }
> +                finally {
> +                    socket.close();
> +                }
> +            }
> +            catch (IOException ex) {
> +                throw new AxisFault("Unable to send packet", ex);
> +            }
> +        }
> +    }
> +
> +    private void waitForReply(MessageContext messageContext, DatagramSocket 
> datagramSocket, String contentType) throws IOException {
> +
> +        // piggy back message constant is used to pass a piggy back
> +        // message context in asnych model
> +        if (!(messageContext.getAxisOperation() instanceof 
> OutInAxisOperation) &&
> +                
> (messageContext.getProperty(org.apache.axis2.Constants.PIGGYBACK_MESSAGE) == 
> null)) {
> +            return;
>         }
> -        catch (IOException ex) {
> -            throw new AxisFault("Unable to send packet", ex);
> +
> +        byte[] inputBuffer = new byte[4096]; //TODO set the maximum size 
> parameter
> +        DatagramPacket packet = new DatagramPacket(inputBuffer, 
> inputBuffer.length);
> +        datagramSocket.receive(packet);
> +
> +        // create the soap envelope
> +        try {
> +            MessageContext respMessageContext = 
> messageContext.getOperationContext().getMessageContext(WSDL2Constants.MESSAGE_LABEL_IN);
> +            InputStream inputStream = new ByteArrayInputStream(inputBuffer, 
> 0, inputBuffer.length);
> +            SOAPEnvelope envelope = 
> TransportUtils.createSOAPMessage(respMessageContext, inputStream, 
> contentType);
> +            respMessageContext.setEnvelope(envelope);
> +        } catch (XMLStreamException e) {
> +            throw new AxisFault("Can not build the soap message ", e);
>         }
>     }
>  }
>
>
>

Reply via email to