Author: amilas
Date: Fri Dec  4 08:32:41 2009
New Revision: 887108

URL: http://svn.apache.org/viewvc?rev=887108&view=rev
Log:
apply the patch for WSCOMMONS-511

Added:
    
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramOutTransportInfo.java
Modified:
    
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java
    
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramDispatcherCallback.java
    
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/ProcessPacketTask.java
    
webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java
    
webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPOutTransportInfo.java
    
webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java

Modified: 
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java
URL: 
http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java?rev=887108&r1=887107&r2=887108&view=diff
==============================================================================
--- 
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java
 (original)
+++ 
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java
 Fri Dec  4 08:32:41 2009
@@ -20,6 +20,8 @@
 
 import java.io.IOException;
 import java.net.SocketException;
+import java.net.SocketAddress;
+import java.nio.channels.DatagramChannel;
 
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.context.ConfigurationContext;
@@ -38,8 +40,12 @@
         
         super.init(cfgCtx, transportIn);
         DatagramDispatcherCallback callback = new DatagramDispatcherCallback() 
{
-            public void receive(DatagramEndpoint endpoint, byte[] data, int 
length) {
-                workerPool.execute(new ProcessPacketTask(endpoint, data, 
length));
+
+            public void receive(SocketAddress address,
+                                DatagramEndpoint endpoint,
+                                byte[] data,
+                                int length) {
+                workerPool.execute(new ProcessPacketTask(address, endpoint, 
data, length));
             }
         };
         try {

Modified: 
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramDispatcherCallback.java
URL: 
http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramDispatcherCallback.java?rev=887108&r1=887107&r2=887108&view=diff
==============================================================================
--- 
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramDispatcherCallback.java
 (original)
+++ 
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramDispatcherCallback.java
 Fri Dec  4 08:32:41 2009
@@ -18,6 +18,12 @@
  */
 package org.apache.axis2.transport.base.datagram;
 
+import java.nio.channels.DatagramChannel;
+import java.net.SocketAddress;
+
 public interface DatagramDispatcherCallback {
-    void receive(DatagramEndpoint endpoint, byte[] data, int length);
+    void receive(SocketAddress address,
+                 DatagramEndpoint endpoint,
+                 byte[] data,
+                 int length);
 }

Added: 
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramOutTransportInfo.java
URL: 
http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramOutTransportInfo.java?rev=887108&view=auto
==============================================================================
--- 
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramOutTransportInfo.java
 (added)
+++ 
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramOutTransportInfo.java
 Fri Dec  4 08:32:41 2009
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.axis2.transport.base.datagram;
+
+import org.apache.axis2.transport.OutTransportInfo;
+
+import java.nio.channels.DatagramChannel;
+import java.net.SocketAddress;
+
+public class DatagramOutTransportInfo implements OutTransportInfo {
+    //out transport for back chanel
+    protected SocketAddress sourceAddress;
+    protected String contentType;
+
+    public SocketAddress getSourceAddress() {
+        return sourceAddress;
+    }
+
+    public void setSourceAddress(SocketAddress sourceAddress) {
+        this.sourceAddress = sourceAddress;
+    }
+
+    public String getContentType() {
+        return contentType;
+    }
+
+    public void setContentType(String contentType) {
+        this.contentType = contentType;
+    }
+}

Modified: 
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/ProcessPacketTask.java
URL: 
http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/ProcessPacketTask.java?rev=887108&r1=887107&r2=887108&view=diff
==============================================================================
--- 
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/ProcessPacketTask.java
 (original)
+++ 
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/ProcessPacketTask.java
 Fri Dec  4 08:32:41 2009
@@ -20,6 +20,8 @@
 
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
+import java.nio.channels.DatagramChannel;
+import java.net.SocketAddress;
 
 import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.axis2.context.MessageContext;
@@ -28,6 +30,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.axis2.transport.base.MetricsCollector;
+import org.apache.axis2.Constants;
 
 /**
  * Task encapsulating the processing of a datagram.
@@ -40,11 +43,21 @@
     private final DatagramEndpoint endpoint;
     private final byte[] data;
     private final int length;
+
+    //back channel data
+    private DatagramChannel datagramChannel;
+    private SocketAddress address;
     
-    public ProcessPacketTask(DatagramEndpoint endpoint, byte[] data, int 
length) {
+    public ProcessPacketTask(SocketAddress address,
+                             DatagramEndpoint endpoint,
+                             byte[] data,
+                             int length) {
         this.endpoint = endpoint;
         this.data = data;
         this.length = length;
+
+        this.datagramChannel = datagramChannel;
+        this.address = address;
     }
     
     public void run() {
@@ -54,6 +67,14 @@
             MessageContext msgContext = endpoint.createMessageContext();
             SOAPEnvelope envelope = 
TransportUtils.createSOAPMessage(msgContext, inputStream, 
endpoint.getContentType());
             msgContext.setEnvelope(envelope);
+
+            //create and out transport info object
+            DatagramOutTransportInfo datagramOutTransportInfo = new 
DatagramOutTransportInfo();
+            datagramOutTransportInfo.setContentType(endpoint.getContentType());
+            datagramOutTransportInfo.setSourceAddress(address);
+
+            msgContext.setProperty(Constants.OUT_TRANSPORT_INFO, 
datagramOutTransportInfo);
+
             AxisEngine.receive(msgContext);
             metrics.incrementMessagesReceived();
             metrics.incrementBytesReceived(length);

Modified: 
webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java
URL: 
http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java?rev=887108&r1=887107&r2=887108&view=diff
==============================================================================
--- 
webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java
 (original)
+++ 
webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java
 Fri Dec  4 08:32:41 2009
@@ -252,7 +252,7 @@
             if (log.isDebugEnabled()) {
                 log.debug("Received packet from " + address + " with length " 
+ length);
             }
-            callback.receive(endpoint, data, length);
+            callback.receive(address, endpoint, data, length);
         } catch (IOException ex) {
             endpoint.getMetrics().incrementFaultsReceiving();
             log.error("Error receiving UDP packet", ex);

Modified: 
webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPOutTransportInfo.java
URL: 
http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPOutTransportInfo.java?rev=887108&r1=887107&r2=887108&view=diff
==============================================================================
--- 
webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPOutTransportInfo.java
 (original)
+++ 
webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPOutTransportInfo.java
 Fri Dec  4 08:32:41 2009
@@ -23,15 +23,15 @@
 
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.transport.OutTransportInfo;
+import org.apache.axis2.transport.base.datagram.DatagramOutTransportInfo;
 
 /**
  * Holder of information to send an outgoing message to a UDP destination.
  */
-public class UDPOutTransportInfo implements OutTransportInfo {
+public class UDPOutTransportInfo extends DatagramOutTransportInfo {
     private String host;
     private int port;
-    private String contentType;
-    
+
     public UDPOutTransportInfo(String eprString) throws AxisFault {
         URI epr;
         try {
@@ -65,12 +65,4 @@
     public void setPort(int port) {
         this.port = port;
     }
-
-    public String getContentType() {
-        return contentType;
-    }
-
-    public void setContentType(String contentType) {
-        this.contentType = contentType;
-    }
 }

Modified: 
webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java
URL: 
http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java?rev=887108&r1=887107&r2=887108&view=diff
==============================================================================
--- 
webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java
 (original)
+++ 
webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java
 Fri Dec  4 08:32:41 2009
@@ -19,22 +19,33 @@
 package org.apache.axis2.transport.udp;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.ByteArrayInputStream;
 import java.net.DatagramPacket;
 import java.net.DatagramSocket;
 import java.net.InetAddress;
+import java.net.SocketAddress;
+import java.nio.channels.DatagramChannel;
+import java.nio.ByteBuffer;
 
 import org.apache.axiom.om.OMOutputFormat;
+import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.description.WSDL2Constants;
+import org.apache.axis2.description.OutInAxisOperation;
 import org.apache.axis2.transport.MessageFormatter;
 import org.apache.axis2.transport.OutTransportInfo;
 import org.apache.axis2.transport.TransportUtils;
 import org.apache.axis2.transport.base.AbstractTransportSender;
 import org.apache.axis2.transport.base.BaseUtils;
+import org.apache.axis2.transport.base.datagram.DatagramOutTransportInfo;
 import org.apache.commons.logging.LogFactory;
 
+import javax.xml.stream.XMLStreamException;
+
 /**
  * Transport sender for the UDP protocol.
  * 
@@ -52,22 +63,72 @@
     
     @Override
     public void sendMessage(MessageContext msgContext, String targetEPR, 
OutTransportInfo outTransportInfo) throws AxisFault {
-        UDPOutTransportInfo udpOutInfo = new UDPOutTransportInfo(targetEPR);
-        MessageFormatter messageFormatter = 
TransportUtils.getMessageFormatter(msgContext);
-        OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext);
-        format.setContentType(udpOutInfo.getContentType());
-        byte[] payload = messageFormatter.getBytes(msgContext, format);
-        try {
-            DatagramSocket socket = new DatagramSocket();
+        if ((targetEPR == null) && (outTransportInfo != null)) {
+            // this can happen only at the server side and send the message 
using back chanel
+            DatagramOutTransportInfo datagramOutTransportInfo = 
(DatagramOutTransportInfo) outTransportInfo;
+            MessageFormatter messageFormatter = 
TransportUtils.getMessageFormatter(msgContext);
+            OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext);
+            format.setContentType(datagramOutTransportInfo.getContentType());
+            byte[] payload = messageFormatter.getBytes(msgContext, format);
+
+            ByteBuffer byteBuffer = ByteBuffer.allocate(payload.length);
+            byteBuffer.put(payload);
+
+            DatagramSocket socket = null;
             try {
-                socket.send(new DatagramPacket(payload, payload.length, 
InetAddress.getByName(udpOutInfo.getHost()), udpOutInfo.getPort()));
-            }
-            finally {
+                socket = new DatagramSocket();
+                socket.send(new DatagramPacket(payload, payload.length, 
datagramOutTransportInfo.getSourceAddress()));
+            } catch (IOException e) {
+                throw new AxisFault("Unable to send packet", e);
+            } finally {
                 socket.close();
             }
+
+        } else {
+            UDPOutTransportInfo udpOutInfo = new 
UDPOutTransportInfo(targetEPR);
+            MessageFormatter messageFormatter = 
TransportUtils.getMessageFormatter(msgContext);
+            OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext);
+            format.setContentType(udpOutInfo.getContentType());
+            byte[] payload = messageFormatter.getBytes(msgContext, format);
+            try {
+                DatagramSocket socket = new DatagramSocket();
+                try {
+                    socket.send(new DatagramPacket(payload, payload.length, 
InetAddress.getByName(udpOutInfo.getHost()), udpOutInfo.getPort()));
+                    if (!msgContext.getOptions().isUseSeparateListener() && 
!msgContext.isServerSide()){
+                        waitForReply(msgContext, socket, 
udpOutInfo.getContentType());
+                    }
+                }
+                finally {
+                    socket.close();
+                }
+            }
+            catch (IOException ex) {
+                throw new AxisFault("Unable to send packet", ex);
+            }
+        }
+    }
+
+    private void waitForReply(MessageContext messageContext, DatagramSocket 
datagramSocket, String contentType) throws IOException {
+
+        // piggy back message constant is used to pass a piggy back
+        // message context in asnych model
+        if (!(messageContext.getAxisOperation() instanceof OutInAxisOperation) 
&&
+                
(messageContext.getProperty(org.apache.axis2.Constants.PIGGYBACK_MESSAGE) == 
null)) {
+            return;
         }
-        catch (IOException ex) {
-            throw new AxisFault("Unable to send packet", ex);
+
+        byte[] inputBuffer = new byte[4096]; //TODO set the maximum size 
parameter
+        DatagramPacket packet = new DatagramPacket(inputBuffer, 
inputBuffer.length);
+        datagramSocket.receive(packet);
+
+        // create the soap envelope
+        try {
+            MessageContext respMessageContext = 
messageContext.getOperationContext().getMessageContext(WSDL2Constants.MESSAGE_LABEL_IN);
+            InputStream inputStream = new ByteArrayInputStream(inputBuffer, 0, 
inputBuffer.length);
+            SOAPEnvelope envelope = 
TransportUtils.createSOAPMessage(respMessageContext, inputStream, contentType);
+            respMessageContext.setEnvelope(envelope);
+        } catch (XMLStreamException e) {
+            throw new AxisFault("Can not build the soap message ", e);
         }
     }
 }


Reply via email to