On Tue, Dec 22, 2009 at 12:57 AM, Andreas Veithen <andreas.veit...@gmail.com
> wrote:

> Amila,
>
> The classes in org.apache.axis2.transport.base.datagram should not
> refer to SocketAddress, since this is something specific to UDP. Can
> you please think about a better solution?
>

will see.
In that case I think most of the stuff in base.datagram have to move to udp
transport module.


thanks,
Amila.

>
> Andreas
>
> On Fri, Dec 4, 2009 at 09:33,  <ami...@apache.org> wrote:
> > Author: amilas
> > Date: Fri Dec  4 08:32:41 2009
> > New Revision: 887108
> >
> > URL: http://svn.apache.org/viewvc?rev=887108&view=rev
> > Log:
> > apply the patch for WSCOMMONS-511
> >
> > Added:
> >
>  
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramOutTransportInfo.java
> > Modified:
> >
>  
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java
> >
>  
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramDispatcherCallback.java
> >
>  
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/ProcessPacketTask.java
> >
>  
> webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java
> >
>  
> webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPOutTransportInfo.java
> >
>  
> webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java
> >
> > Modified:
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java
> > URL:
> http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java?rev=887108&r1=887107&r2=887108&view=diff
> >
> ==============================================================================
> > ---
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java
> (original)
> > +++
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java
> Fri Dec  4 08:32:41 2009
> > @@ -20,6 +20,8 @@
> >
> >  import java.io.IOException;
> >  import java.net.SocketException;
> > +import java.net.SocketAddress;
> > +import java.nio.channels.DatagramChannel;
> >
> >  import org.apache.axis2.AxisFault;
> >  import org.apache.axis2.context.ConfigurationContext;
> > @@ -38,8 +40,12 @@
> >
> >         super.init(cfgCtx, transportIn);
> >         DatagramDispatcherCallback callback = new
> DatagramDispatcherCallback() {
> > -            public void receive(DatagramEndpoint endpoint, byte[] data,
> int length) {
> > -                workerPool.execute(new ProcessPacketTask(endpoint, data,
> length));
> > +
> > +            public void receive(SocketAddress address,
> > +                                DatagramEndpoint endpoint,
> > +                                byte[] data,
> > +                                int length) {
> > +                workerPool.execute(new ProcessPacketTask(address,
> endpoint, data, length));
> >             }
> >         };
> >         try {
> >
> > Modified:
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramDispatcherCallback.java
> > URL:
> http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramDispatcherCallback.java?rev=887108&r1=887107&r2=887108&view=diff
> >
> ==============================================================================
> > ---
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramDispatcherCallback.java
> (original)
> > +++
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramDispatcherCallback.java
> Fri Dec  4 08:32:41 2009
> > @@ -18,6 +18,12 @@
> >  */
> >  package org.apache.axis2.transport.base.datagram;
> >
> > +import java.nio.channels.DatagramChannel;
> > +import java.net.SocketAddress;
> > +
> >  public interface DatagramDispatcherCallback {
> > -    void receive(DatagramEndpoint endpoint, byte[] data, int length);
> > +    void receive(SocketAddress address,
> > +                 DatagramEndpoint endpoint,
> > +                 byte[] data,
> > +                 int length);
> >  }
> >
> > Added:
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramOutTransportInfo.java
> > URL:
> http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramOutTransportInfo.java?rev=887108&view=auto
> >
> ==============================================================================
> > ---
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramOutTransportInfo.java
> (added)
> > +++
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramOutTransportInfo.java
> Fri Dec  4 08:32:41 2009
> > @@ -0,0 +1,44 @@
> > +/*
> > + * Copyright 2004,2005 The Apache Software Foundation.
> > + *
> > + * Licensed under the Apache License, Version 2.0 (the "License");
> > + * you may not use this file except in compliance with the License.
> > + * You may obtain a copy of the License at
> > + *
> > + *      http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +package org.apache.axis2.transport.base.datagram;
> > +
> > +import org.apache.axis2.transport.OutTransportInfo;
> > +
> > +import java.nio.channels.DatagramChannel;
> > +import java.net.SocketAddress;
> > +
> > +public class DatagramOutTransportInfo implements OutTransportInfo {
> > +    //out transport for back chanel
> > +    protected SocketAddress sourceAddress;
> > +    protected String contentType;
> > +
> > +    public SocketAddress getSourceAddress() {
> > +        return sourceAddress;
> > +    }
> > +
> > +    public void setSourceAddress(SocketAddress sourceAddress) {
> > +        this.sourceAddress = sourceAddress;
> > +    }
> > +
> > +    public String getContentType() {
> > +        return contentType;
> > +    }
> > +
> > +    public void setContentType(String contentType) {
> > +        this.contentType = contentType;
> > +    }
> > +}
> >
> > Modified:
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/ProcessPacketTask.java
> > URL:
> http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/ProcessPacketTask.java?rev=887108&r1=887107&r2=887108&view=diff
> >
> ==============================================================================
> > ---
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/ProcessPacketTask.java
> (original)
> > +++
> webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/ProcessPacketTask.java
> Fri Dec  4 08:32:41 2009
> > @@ -20,6 +20,8 @@
> >
> >  import java.io.ByteArrayInputStream;
> >  import java.io.InputStream;
> > +import java.nio.channels.DatagramChannel;
> > +import java.net.SocketAddress;
> >
> >  import org.apache.axiom.soap.SOAPEnvelope;
> >  import org.apache.axis2.context.MessageContext;
> > @@ -28,6 +30,7 @@
> >  import org.apache.commons.logging.Log;
> >  import org.apache.commons.logging.LogFactory;
> >  import org.apache.axis2.transport.base.MetricsCollector;
> > +import org.apache.axis2.Constants;
> >
> >  /**
> >  * Task encapsulating the processing of a datagram.
> > @@ -40,11 +43,21 @@
> >     private final DatagramEndpoint endpoint;
> >     private final byte[] data;
> >     private final int length;
> > +
> > +    //back channel data
> > +    private DatagramChannel datagramChannel;
> > +    private SocketAddress address;
> >
> > -    public ProcessPacketTask(DatagramEndpoint endpoint, byte[] data, int
> length) {
> > +    public ProcessPacketTask(SocketAddress address,
> > +                             DatagramEndpoint endpoint,
> > +                             byte[] data,
> > +                             int length) {
> >         this.endpoint = endpoint;
> >         this.data = data;
> >         this.length = length;
> > +
> > +        this.datagramChannel = datagramChannel;
> > +        this.address = address;
> >     }
> >
> >     public void run() {
> > @@ -54,6 +67,14 @@
> >             MessageContext msgContext = endpoint.createMessageContext();
> >             SOAPEnvelope envelope =
> TransportUtils.createSOAPMessage(msgContext, inputStream,
> endpoint.getContentType());
> >             msgContext.setEnvelope(envelope);
> > +
> > +            //create and out transport info object
> > +            DatagramOutTransportInfo datagramOutTransportInfo = new
> DatagramOutTransportInfo();
> > +
>  datagramOutTransportInfo.setContentType(endpoint.getContentType());
> > +            datagramOutTransportInfo.setSourceAddress(address);
> > +
> > +            msgContext.setProperty(Constants.OUT_TRANSPORT_INFO,
> datagramOutTransportInfo);
> > +
> >             AxisEngine.receive(msgContext);
> >             metrics.incrementMessagesReceived();
> >             metrics.incrementBytesReceived(length);
> >
> > Modified:
> webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java
> > URL:
> http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java?rev=887108&r1=887107&r2=887108&view=diff
> >
> ==============================================================================
> > ---
> webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java
> (original)
> > +++
> webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java
> Fri Dec  4 08:32:41 2009
> > @@ -252,7 +252,7 @@
> >             if (log.isDebugEnabled()) {
> >                 log.debug("Received packet from " + address + " with
> length " + length);
> >             }
> > -            callback.receive(endpoint, data, length);
> > +            callback.receive(address, endpoint, data, length);
> >         } catch (IOException ex) {
> >             endpoint.getMetrics().incrementFaultsReceiving();
> >             log.error("Error receiving UDP packet", ex);
> >
> > Modified:
> webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPOutTransportInfo.java
> > URL:
> http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPOutTransportInfo.java?rev=887108&r1=887107&r2=887108&view=diff
> >
> ==============================================================================
> > ---
> webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPOutTransportInfo.java
> (original)
> > +++
> webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPOutTransportInfo.java
> Fri Dec  4 08:32:41 2009
> > @@ -23,15 +23,15 @@
> >
> >  import org.apache.axis2.AxisFault;
> >  import org.apache.axis2.transport.OutTransportInfo;
> > +import
> org.apache.axis2.transport.base.datagram.DatagramOutTransportInfo;
> >
> >  /**
> >  * Holder of information to send an outgoing message to a UDP
> destination.
> >  */
> > -public class UDPOutTransportInfo implements OutTransportInfo {
> > +public class UDPOutTransportInfo extends DatagramOutTransportInfo {
> >     private String host;
> >     private int port;
> > -    private String contentType;
> > -
> > +
> >     public UDPOutTransportInfo(String eprString) throws AxisFault {
> >         URI epr;
> >         try {
> > @@ -65,12 +65,4 @@
> >     public void setPort(int port) {
> >         this.port = port;
> >     }
> > -
> > -    public String getContentType() {
> > -        return contentType;
> > -    }
> > -
> > -    public void setContentType(String contentType) {
> > -        this.contentType = contentType;
> > -    }
> >  }
> >
> > Modified:
> webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java
> > URL:
> http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java?rev=887108&r1=887107&r2=887108&view=diff
> >
> ==============================================================================
> > ---
> webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java
> (original)
> > +++
> webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java
> Fri Dec  4 08:32:41 2009
> > @@ -19,22 +19,33 @@
> >  package org.apache.axis2.transport.udp;
> >
> >  import java.io.IOException;
> > +import java.io.InputStream;
> > +import java.io.ByteArrayInputStream;
> >  import java.net.DatagramPacket;
> >  import java.net.DatagramSocket;
> >  import java.net.InetAddress;
> > +import java.net.SocketAddress;
> > +import java.nio.channels.DatagramChannel;
> > +import java.nio.ByteBuffer;
> >
> >  import org.apache.axiom.om.OMOutputFormat;
> > +import org.apache.axiom.soap.SOAPEnvelope;
> >  import org.apache.axis2.AxisFault;
> >  import org.apache.axis2.context.ConfigurationContext;
> >  import org.apache.axis2.context.MessageContext;
> >  import org.apache.axis2.description.TransportOutDescription;
> > +import org.apache.axis2.description.WSDL2Constants;
> > +import org.apache.axis2.description.OutInAxisOperation;
> >  import org.apache.axis2.transport.MessageFormatter;
> >  import org.apache.axis2.transport.OutTransportInfo;
> >  import org.apache.axis2.transport.TransportUtils;
> >  import org.apache.axis2.transport.base.AbstractTransportSender;
> >  import org.apache.axis2.transport.base.BaseUtils;
> > +import
> org.apache.axis2.transport.base.datagram.DatagramOutTransportInfo;
> >  import org.apache.commons.logging.LogFactory;
> >
> > +import javax.xml.stream.XMLStreamException;
> > +
> >  /**
> >  * Transport sender for the UDP protocol.
> >  *
> > @@ -52,22 +63,72 @@
> >
> >     @Override
> >     public void sendMessage(MessageContext msgContext, String targetEPR,
> OutTransportInfo outTransportInfo) throws AxisFault {
> > -        UDPOutTransportInfo udpOutInfo = new
> UDPOutTransportInfo(targetEPR);
> > -        MessageFormatter messageFormatter =
> TransportUtils.getMessageFormatter(msgContext);
> > -        OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext);
> > -        format.setContentType(udpOutInfo.getContentType());
> > -        byte[] payload = messageFormatter.getBytes(msgContext, format);
> > -        try {
> > -            DatagramSocket socket = new DatagramSocket();
> > +        if ((targetEPR == null) && (outTransportInfo != null)) {
> > +            // this can happen only at the server side and send the
> message using back chanel
> > +            DatagramOutTransportInfo datagramOutTransportInfo =
> (DatagramOutTransportInfo) outTransportInfo;
> > +            MessageFormatter messageFormatter =
> TransportUtils.getMessageFormatter(msgContext);
> > +            OMOutputFormat format =
> BaseUtils.getOMOutputFormat(msgContext);
> > +
>  format.setContentType(datagramOutTransportInfo.getContentType());
> > +            byte[] payload = messageFormatter.getBytes(msgContext,
> format);
> > +
> > +            ByteBuffer byteBuffer = ByteBuffer.allocate(payload.length);
> > +            byteBuffer.put(payload);
> > +
> > +            DatagramSocket socket = null;
> >             try {
> > -                socket.send(new DatagramPacket(payload, payload.length,
> InetAddress.getByName(udpOutInfo.getHost()), udpOutInfo.getPort()));
> > -            }
> > -            finally {
> > +                socket = new DatagramSocket();
> > +                socket.send(new DatagramPacket(payload, payload.length,
> datagramOutTransportInfo.getSourceAddress()));
> > +            } catch (IOException e) {
> > +                throw new AxisFault("Unable to send packet", e);
> > +            } finally {
> >                 socket.close();
> >             }
> > +
> > +        } else {
> > +            UDPOutTransportInfo udpOutInfo = new
> UDPOutTransportInfo(targetEPR);
> > +            MessageFormatter messageFormatter =
> TransportUtils.getMessageFormatter(msgContext);
> > +            OMOutputFormat format =
> BaseUtils.getOMOutputFormat(msgContext);
> > +            format.setContentType(udpOutInfo.getContentType());
> > +            byte[] payload = messageFormatter.getBytes(msgContext,
> format);
> > +            try {
> > +                DatagramSocket socket = new DatagramSocket();
> > +                try {
> > +                    socket.send(new DatagramPacket(payload,
> payload.length, InetAddress.getByName(udpOutInfo.getHost()),
> udpOutInfo.getPort()));
> > +                    if (!msgContext.getOptions().isUseSeparateListener()
> && !msgContext.isServerSide()){
> > +                        waitForReply(msgContext, socket,
> udpOutInfo.getContentType());
> > +                    }
> > +                }
> > +                finally {
> > +                    socket.close();
> > +                }
> > +            }
> > +            catch (IOException ex) {
> > +                throw new AxisFault("Unable to send packet", ex);
> > +            }
> > +        }
> > +    }
> > +
> > +    private void waitForReply(MessageContext messageContext,
> DatagramSocket datagramSocket, String contentType) throws IOException {
> > +
> > +        // piggy back message constant is used to pass a piggy back
> > +        // message context in asnych model
> > +        if (!(messageContext.getAxisOperation() instanceof
> OutInAxisOperation) &&
> > +
>  (messageContext.getProperty(org.apache.axis2.Constants.PIGGYBACK_MESSAGE)
> == null)) {
> > +            return;
> >         }
> > -        catch (IOException ex) {
> > -            throw new AxisFault("Unable to send packet", ex);
> > +
> > +        byte[] inputBuffer = new byte[4096]; //TODO set the maximum size
> parameter
> > +        DatagramPacket packet = new DatagramPacket(inputBuffer,
> inputBuffer.length);
> > +        datagramSocket.receive(packet);
> > +
> > +        // create the soap envelope
> > +        try {
> > +            MessageContext respMessageContext =
> messageContext.getOperationContext().getMessageContext(WSDL2Constants.MESSAGE_LABEL_IN);
> > +            InputStream inputStream = new
> ByteArrayInputStream(inputBuffer, 0, inputBuffer.length);
> > +            SOAPEnvelope envelope =
> TransportUtils.createSOAPMessage(respMessageContext, inputStream,
> contentType);
> > +            respMessageContext.setEnvelope(envelope);
> > +        } catch (XMLStreamException e) {
> > +            throw new AxisFault("Can not build the soap message ", e);
> >         }
> >     }
> >  }
> >
> >
> >
>



-- 
Amila Suriarachchi
WSO2 Inc.
blog: http://amilachinthaka.blogspot.com/

Reply via email to