Author: veithen
Date: Wed May  7 14:31:29 2008
New Revision: 654291

URL: http://svn.apache.org/viewvc?rev=654291&view=rev
Log:
SYNAPSE-282: Added UDP transport listener (tested with a Synapse proxy and the 
syslog message builder).

Added:
    
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/
    
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/Endpoint.java
    
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/IODispatcher.java
    
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/ProcessPacketTask.java
    
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/UDPConstants.java
    
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/UDPListener.java
    
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/Utils.java
    
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/package-info.java

Added: 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/Endpoint.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/Endpoint.java?rev=654291&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/Endpoint.java
 (added)
+++ 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/Endpoint.java
 Wed May  7 14:31:29 2008
@@ -0,0 +1,75 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.transport.udp;
+
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.description.AxisService;
+import org.apache.synapse.transport.base.MetricsCollector;
+
+/**
+ * UDP endpoint description.
+ * This class is used by the transport to store information
+ * about an endpoint, i.e. a UDP port the transport listens on and that is
+ * bound to a given Axis service.
+ */
+public class Endpoint {
+    private final UDPListener listener;
+    private final int port;
+    private final String contentType;
+    private final int maxPacketSize;
+    private final AxisService service;
+    private final MetricsCollector metrics;
+    
+    public Endpoint(UDPListener listener, int port, String contentType, int 
maxPacketSize, AxisService service, MetricsCollector metrics) {
+        this.listener = listener;
+        this.port = port;
+        this.contentType = contentType;
+        this.maxPacketSize = maxPacketSize;
+        this.service = service;
+        this.metrics = metrics;
+    }
+
+    public UDPListener getListener() {
+        return listener;
+    }
+
+    public int getPort() {
+        return port;
+    }
+    
+    public String getContentType() {
+        return contentType;
+    }
+
+    public int getMaxPacketSize() {
+        return maxPacketSize;
+    }
+
+    public AxisService getService() {
+        return service;
+    }
+
+    public MetricsCollector getMetrics() {
+        return metrics;
+    }
+    
+    public EndpointReference getEndpointReference(String ip) {
+        return new EndpointReference("udp://" + ip + ":" + getPort() + 
"?contentType=" + contentType);
+    }
+}

Added: 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/IODispatcher.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/IODispatcher.java?rev=654291&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/IODispatcher.java
 (added)
+++ 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/IODispatcher.java
 Wed May  7 14:31:29 2008
@@ -0,0 +1,259 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.transport.udp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.transport.base.threads.WorkerPool;
+
+/**
+ * I/O dispatcher for incoming UDP packets.
+ * This class is responsible for receiving UDP packets and dispatch
+ * the processing of these packets to worker threads.
+ * It uses a [EMAIL PROTECTED] Selector} to receive packets from multiple 
endpoints
+ * and a [EMAIL PROTECTED] WorkerPool} to dispatch the processing tasks.
+ * <p>
+ * The dispatcher uses the following thread model:
+ * Incoming packets for all the registered endpoints are received
+ * in the thread that executes the [EMAIL PROTECTED] #run()} method. For every
+ * packet received, a [EMAIL PROTECTED] ProcessPacketTask} instance is created
+ * and dispatched to a worker thread from the configured pool.
+ * <p>
+ * The methods [EMAIL PROTECTED] #addEndpoint(Endpoint)}, [EMAIL PROTECTED] 
#removeEndpoint(String)}
+ * and [EMAIL PROTECTED] #stop()} are thread safe and may be called from any 
thread.
+ * However, to avoid concurrency issues, the operation on the underlying
+ * [EMAIL PROTECTED] Selector} will always be executed by the thread executing 
the
+ * [EMAIL PROTECTED] #run()} method. The three methods mentioned above will 
block until
+ * the operation has completed.
+ */
+public class IODispatcher implements Runnable {
+    private static abstract class SelectorOperation {
+        private final CountDownLatch done = new CountDownLatch(1);
+        private IOException exception;
+        
+        public void waitForCompletion() throws IOException, 
InterruptedException {
+            done.await();
+            if (exception != null) {
+                throw exception;
+            }
+        }
+        
+        public void execute(Selector selector) {
+            try {
+                doExecute(selector);
+            } catch (IOException ex) {
+                exception = ex;
+            } catch (Throwable ex) {
+                exception = new IOException("Unexpected exception");
+                exception.initCause(ex);
+            }
+            done.countDown();
+        }
+        
+        public abstract void doExecute(Selector selector) throws IOException;
+    }
+    
+    private static final Log log = LogFactory.getLog(IODispatcher.class);
+    
+    private final WorkerPool workerPool;
+    private final Selector selector;
+    private final Queue<SelectorOperation> selectorOperationQueue = new 
ConcurrentLinkedQueue<SelectorOperation>();
+    
+    /**
+     * Constructor.
+     * 
+     * @param workerPool the worker pool to dispatch processing task to
+     * @throws IOException if the [EMAIL PROTECTED] Selector} instance could 
not be created
+     */
+    public IODispatcher(WorkerPool workerPool) throws IOException {
+        this.workerPool = workerPool;
+        selector = Selector.open();
+    }
+    
+    /**
+     * Add a new endpoint. This method creates a new socket listening on
+     * the UDP port specified in the endpoint description and makes sure
+     * that incoming packets are routed to the specified service.
+     * 
+     * @param endpoint the endpoint description
+     * @throws IOException if the socket could not be created or
+     *         registered with the selector
+     */
+    public void addEndpoint(final Endpoint endpoint) throws IOException {
+        final DatagramChannel channel = DatagramChannel.open();
+        channel.socket().bind(new InetSocketAddress(endpoint.getPort()));
+        channel.configureBlocking(false);
+        execute(new SelectorOperation() {
+            @Override
+            public void doExecute(Selector selector) throws IOException {
+                channel.register(selector, SelectionKey.OP_READ, endpoint);
+            }
+        });
+    }
+    
+    /**
+     * Remove an endpoint. This causes the corresponding UDP socket to be
+     * closed.
+     * 
+     * @param serviceName the name of the service corresponding to
+     *                    the endpoint
+     * @throws IOException if an error occurred when closing the socket
+     */
+    public void removeEndpoint(final String serviceName) throws IOException {
+        execute(new SelectorOperation() {
+            @Override
+            public void doExecute(Selector selector) throws IOException {
+                Iterator<SelectionKey> it = selector.keys().iterator();
+                while (it.hasNext()) {
+                    SelectionKey key = it.next();
+                    Endpoint endpoint = (Endpoint)key.attachment();
+                    if (serviceName.equals(endpoint.getService().getName())) {
+                        key.cancel();
+                        key.channel().close();
+                        break;
+                    }
+                }
+            }
+        });
+    }
+    
+    /**
+     * Stop the dispatcher.
+     * This method closes all sockets and causes the execution of the
+     * [EMAIL PROTECTED] #run()} method to stop.
+     * 
+     * @throws IOException
+     */
+    public void stop() throws IOException {
+        execute(new SelectorOperation() {
+            @Override
+            public void doExecute(Selector selector) throws IOException {
+                IOException exception = null;
+                for (SelectionKey key : selector.keys()) {
+                    try {
+                        key.channel().close();
+                    } catch (IOException ex) {
+                        if (exception == null) {
+                            exception = ex;
+                        }
+                    }
+                }
+                try {
+                    selector.close();
+                } catch (IOException ex) {
+                    if (exception == null) {
+                        exception = ex;
+                    }
+                }
+                if (exception != null) {
+                    throw exception;
+                }
+            }
+        });
+    }
+    
+    /**
+     * Run the I/O dispatcher.
+     * This method contains the event loop that polls the selector, reads the 
incoming
+     * packets and dispatches the work.
+     * It only returns when [EMAIL PROTECTED] #stop()} is called.
+     */
+    public void run() {
+        while (true) {
+            try {
+                selector.select();
+            } catch (IOException ex) {
+                log.error("Exception in select; I/O dispatcher will be shut 
down", ex);
+                return;
+            }
+            // Execute pending selector operations
+            while (true) {
+                SelectorOperation request = selectorOperationQueue.poll();
+                if (request == null) {
+                    break;
+                }
+                request.execute(selector);
+                if (!selector.isOpen()) {
+                    return;
+                }
+            }
+            for (Iterator<SelectionKey> it = 
selector.selectedKeys().iterator(); it.hasNext(); ) {
+                SelectionKey key = it.next();
+                it.remove();
+                if (key.isValid() && key.isReadable()) {
+                    receive((Endpoint)key.attachment(), 
(DatagramChannel)key.channel());
+                }
+            }
+        }
+    }
+    
+    private void execute(SelectorOperation operation) throws IOException {
+        selectorOperationQueue.add(operation);
+        selector.wakeup();
+        // Waiting for the execution of the selector operation will
+        // never take a long time. It therefore makes no sense to
+        // propagate InterruptedExceptions. If one is thrown, we
+        // remember that and set the interruption status accordingly
+        // afterwards.
+        // See http://www.ibm.com/developerworks/java/library/j-jtp05236.html
+        boolean interrupted = false;
+        try {
+            while (true) {
+                try {
+                    operation.waitForCompletion();
+                    return;
+                } catch (InterruptedException ex) {
+                    interrupted = true;
+                }
+            }
+        } finally {
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+    
+    private void receive(Endpoint endpoint, DatagramChannel channel) {
+        try {
+            byte[] data = new byte[endpoint.getMaxPacketSize()];
+            ByteBuffer buffer = ByteBuffer.wrap(data);
+            SocketAddress address = channel.receive(buffer);
+            int length = buffer.position();
+            if (log.isDebugEnabled()) {
+                log.debug("Received packet from " + address + " with length " 
+ length);
+            }
+            workerPool.execute(new ProcessPacketTask(endpoint, data, length));
+        } catch (IOException ex) {
+            endpoint.getMetrics().incrementFaultsReceiving();
+            log.error("Error receiving UDP packet", ex);
+        }
+    }
+}

Added: 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/ProcessPacketTask.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/ProcessPacketTask.java?rev=654291&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/ProcessPacketTask.java
 (added)
+++ 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/ProcessPacketTask.java
 Wed May  7 14:31:29 2008
@@ -0,0 +1,68 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.transport.udp;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.transport.TransportUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.transport.base.MetricsCollector;
+
+/**
+ * Task encapsulating the processing of a packet.
+ * Instances of this class will be dispatched to worker threads for
+ * execution.
+ */
+public class ProcessPacketTask implements Runnable {
+    private static final Log log = LogFactory.getLog(ProcessPacketTask.class);
+    
+    private final Endpoint endpoint;
+    private final byte[] data;
+    private final int length;
+    
+    public ProcessPacketTask(Endpoint endpoint, byte[] data, int length) {
+        this.endpoint = endpoint;
+        this.data = data;
+        this.length = length;
+    }
+    
+    public void run() {
+        MetricsCollector metrics = endpoint.getMetrics();
+        try {
+            InputStream inputStream = new ByteArrayInputStream(data, 0, 
length);
+            MessageContext msgContext = 
endpoint.getListener().createMessageContext();
+            msgContext.setAxisService(endpoint.getService());
+            SOAPEnvelope envelope = 
TransportUtils.createSOAPMessage(msgContext, inputStream, 
endpoint.getContentType());
+            msgContext.setEnvelope(envelope);
+            AxisEngine.receive(msgContext);
+            metrics.incrementMessagesReceived();
+            metrics.incrementBytesReceived(length);
+        } catch (Exception ex) {
+            metrics.incrementFaultsReceiving();
+            StringBuilder buffer = new StringBuilder("Error during processing 
of UDP packet:\n");
+            Utils.hexDump(buffer, data, length);
+            log.error(buffer.toString(), ex);
+        }
+    }
+}

Added: 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/UDPConstants.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/UDPConstants.java?rev=654291&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/UDPConstants.java
 (added)
+++ 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/UDPConstants.java
 Wed May  7 14:31:29 2008
@@ -0,0 +1,34 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.transport.udp;
+
+/**
+ * Utility class defining constants used by the UDP transport.
+ */
+public class UDPConstants {
+    private UDPConstants() {}
+    
+    public static final String TRANSPORT_NAME = "udp";
+    
+    public static final int DEFAULT_MAX_PACKET_SIZE = 1024;
+    
+    public static final String PORT_KEY = "transport.udp.port";
+    public static final String CONTENT_TYPE_KEY = "transport.udp.contentType";
+    public static final String MAX_PACKET_SIZE_KEY = 
"transport.udp.maxPacketSize";
+}

Added: 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/UDPListener.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/UDPListener.java?rev=654291&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/UDPListener.java
 (added)
+++ 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/UDPListener.java
 Wed May  7 14:31:29 2008
@@ -0,0 +1,152 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.transport.udp;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.description.TransportInDescription;
+import org.apache.synapse.transport.base.AbstractTransportListener;
+import org.apache.synapse.transport.base.ManagementSupport;
+
+/**
+ * Transport listener for the UDP protocol.
+ * Services accepting messages using this transport must be configured with the
+ * following parameters:
+ * <dl>
+ *   <dt>transport.udp.port</dt>
+ *   <dd>The UDP port to listen to (required).</dd>
+ *   <dt>transport.udp.contentType</dt>
+ *   <dd>The content type of the messages received (required). This setting
+ *       is used to select the appropriate message builder.</dd>
+ *   <dt>transport.udp.maxPacketSize</dt>
+ *   <dd>The maximum packet size (optional; default 1024). Packets longer
+ *       than the specified length will be truncated.</dd>
+ * </dl>
+ * 
+ * @see org.apache.synapse.transport.udp
+ */
+public class UDPListener extends AbstractTransportListener implements 
ManagementSupport {
+    private final Map<String,Endpoint> endpoints = new 
HashMap<String,Endpoint>();
+    
+    private IODispatcher dispatcher;
+    
+    @Override
+    public void init(ConfigurationContext cfgCtx, TransportInDescription 
transportIn) throws AxisFault {
+        setTransportName(UDPConstants.TRANSPORT_NAME);
+        super.init(cfgCtx, transportIn);
+        try {
+            dispatcher = new IODispatcher(workerPool);
+        } catch (IOException ex) {
+            throw new AxisFault("Unable to create selector", ex);
+        }
+        // Start a new thread for the I/O dispatcher
+        new Thread(dispatcher, getTransportName() + "-dispatcher").start();
+    }
+
+    @Override
+    public void destroy() {
+        super.destroy();
+        try {
+            dispatcher.stop();
+        } catch (IOException ex) {
+            log.error("Failed to stop dispatcher", ex);
+        }
+    }
+
+    @Override
+    protected void startListeningForService(AxisService service) {
+        Parameter param;
+        
+        int port;
+        param = service.getParameter(UDPConstants.PORT_KEY);
+        if (param == null) {
+            log.info("No UDP port number specified for service " + 
service.getName() + "; disabling transport for this service");
+            disableTransportForService(service);
+            return;
+        } else {
+            try {
+                port = Integer.parseInt(param.getValue().toString());
+            }
+            catch (NumberFormatException ex) {
+                log.error("Invalid port number " + param.getValue() + " for 
service " + service.getName());
+                disableTransportForService(service);
+                return;
+            }
+        }
+        
+        int maxPacketSize = UDPConstants.DEFAULT_MAX_PACKET_SIZE;
+        param = service.getParameter(UDPConstants.MAX_PACKET_SIZE_KEY);
+        if (param != null) {
+            try {
+                maxPacketSize = Integer.parseInt(param.getValue().toString());
+            }
+            catch (NumberFormatException ex) {
+                log.warn("Invalid maximum packet size; falling back to default 
value " + maxPacketSize);
+            }
+        }
+        
+        String contentType;
+        param = service.getParameter(UDPConstants.CONTENT_TYPE_KEY);
+        if (param == null) {
+            log.info("No content type specified for service " + 
service.getName() + "; disabling transport for this service");
+            disableTransportForService(service);
+            return;
+        } else {
+            contentType = (String)param.getValue();
+        }
+        
+        Endpoint endpoint = new Endpoint(this, port, contentType, 
maxPacketSize, service, metrics);
+        try {
+            dispatcher.addEndpoint(endpoint);
+        } catch (IOException ex) {
+            log.error("Unable to listen on port " + port, ex);
+            disableTransportForService(service);
+            return;
+        }
+        if (log.isDebugEnabled()) {
+            log.debug("Started listening on port " + port + " [contentType=" + 
contentType + "; maxPacketSize=" + maxPacketSize + "; service=" + 
service.getName() + "]");
+        }
+        endpoints.put(service.getName(), endpoint);
+    }
+
+    @Override
+    protected void stopListeningForService(AxisService service) {
+        try {
+            dispatcher.removeEndpoint(service.getName());
+        } catch (IOException ex) {
+            log.error("I/O exception while stopping listener for service " + 
service.getName(), ex);
+        }
+    }
+
+    public EndpointReference[] getEPRsForService(String serviceName, String 
ip) throws AxisFault {
+        Endpoint endpoint = endpoints.get(serviceName);
+        if (endpoint == null) {
+            return null;
+        } else {
+            return new EndpointReference[] { endpoint.getEndpointReference(ip) 
};
+        }
+    }
+}

Added: 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/Utils.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/Utils.java?rev=654291&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/Utils.java
 (added)
+++ 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/Utils.java
 Wed May  7 14:31:29 2008
@@ -0,0 +1,61 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.transport.udp;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Utility class with methods used by the UDP transport.
+ */
+public class Utils {
+    private Utils() {}
+    
+    public static void hexDump(StringBuilder buffer, byte[] data, int length) {
+        for (int start = 0; start < length; start += 16) {
+            for (int i=0; i<16; i++) {
+                int index = start+i;
+                if (index < length) {
+                    
buffer.append(StringUtils.leftPad(Integer.toHexString(data[start+i]), 2, '0'));
+                } else {
+                    buffer.append("  ");
+                }
+                buffer.append(' ');
+                if (i == 8) {
+                    buffer.append(' ');
+                }
+            }
+            buffer.append(" |");
+            for (int i=0; i<16; i++) {
+                int index = start+i;
+                if (index < length) {
+                    int b = data[index] & 0xFF;
+                    if (32 <= b && b < 128) {
+                        buffer.append((char)b);
+                    } else {
+                        buffer.append('.');
+                    }
+                } else {
+                    buffer.append(' ');
+                }
+            }
+            buffer.append('|');
+            buffer.append('\n');
+        }
+    }
+}

Added: 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/package-info.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/package-info.java?rev=654291&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/package-info.java
 (added)
+++ 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/udp/package-info.java
 Wed May  7 14:31:29 2008
@@ -0,0 +1,65 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+/**
+ * Transport implementation for the UDP protocol.
+ * <p>
+ * This package contains a transport listener implementation allowing Axis to
+ * receive and process UDP packets. It is an implementation of "raw" UDP in the
+ * sense that the message is directly extracted from the UDP payload without
+ * any intermediate application protocol. This has two important implications:
+ * <ul>
+ *   <li>The only way to route the incoming message to the appropriate Axis 
service
+ *       is to bind the service a specific UDP port. The port number must be
+ *       explicitly defined in the service configuration. This is different
+ *       from protocols such as HTTP where the message can be routed
+ *       based on the URL in the request.</li>
+ *   <li>The transport has no way to detect the content type of an incoming
+ *       message. Indeed, there is no equivalent to HTTP's
+ *       <tt>Content-Type</tt> header. Again the expected content type must be
+ *       configured explicitly for the service.</li>
+ * </ul>
+ * See the documentation of [EMAIL PROTECTED] 
org.apache.synapse.transport.udp.UDPListener}
+ * for more information about how to configure a service to accept UDP packets.
+ * <p>
+ * It should also be noted that given its characteristics, UDP is not a
+ * suitable transport protocol for SOAP, except maybe in very particular
+ * circumstances. Indeed, UDP is an unreliable protocol:
+ * <ul>
+ *   <li>There is no delivery guarantee, i.e. packets may be lost.</li>
+ *   <li>Messages may arrive out of order.</li>
+ *   <li>Messages may be duplicated, i.e. delivered twice.</li>
+ * </ul>
+ * This transport implementation is useful mainly to integrate Axis (and in
+ * particular Synapse) with existing UDP based protocols. See
+ * [EMAIL PROTECTED] org.apache.synapse.format.syslog} for an example of this 
kind
+ * of protocol.
+ * 
+ * <h4>Known issues</h4>
+ * 
+ * <ul>
+ *   <li>Packets longer than the configured maximum packet size
+ *       are silently truncated. Packet truncation should be detected
+ *       and trigger an error.</li>
+ *   <li>The listener doesn't implement all management operations
+ *       specified by
+ *       [EMAIL PROTECTED] 
org.apache.synapse.transport.base.ManagementSupport}.</li>
+ * </ul>
+ */
+package org.apache.synapse.transport.udp;
\ No newline at end of file


Reply via email to