Author: supun
Date: Mon Feb 21 13:23:54 2011
New Revision: 1072963

URL: http://svn.apache.org/viewvc?rev=1072963&view=rev
Log:
adding a listener context to hold some of the configurations of the listener, 
this re-factors some methods to have less parameters

Added:
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ListenerContext.java
Modified:
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerWorker.java

Modified: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java?rev=1072963&r1=1072962&r2=1072963&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
 (original)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
 Mon Feb 21 13:23:54 2011
@@ -19,8 +19,6 @@
 package org.apache.synapse.transport.nhttp;
 
 import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.OMAttribute;
-import org.apache.axiom.om.impl.builder.StAXOMBuilder;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.context.ConfigurationContext;
@@ -39,36 +37,20 @@ import org.apache.commons.logging.LogFac
 import org.apache.http.impl.nio.reactor.DefaultListeningIOReactor;
 import org.apache.http.impl.nio.reactor.SSLIOSessionHandler;
 import org.apache.http.nio.NHttpServiceHandler;
-import org.apache.http.nio.params.NIOReactorPNames;
 import org.apache.http.nio.reactor.IOEventDispatch;
 import org.apache.http.nio.reactor.IOReactorExceptionHandler;
 import org.apache.http.nio.reactor.ListenerEndpoint;
-import org.apache.http.params.BasicHttpParams;
-import org.apache.http.params.HttpConnectionParams;
 import org.apache.http.params.HttpParams;
-import org.apache.http.params.HttpProtocolParams;
-import org.apache.synapse.commons.executors.PriorityExecutor;
-import org.apache.synapse.commons.executors.ExecutorConstants;
-import org.apache.synapse.commons.executors.config.PriorityExecutorFactory;
-import org.apache.synapse.commons.evaluators.Parser;
-import org.apache.synapse.commons.evaluators.EvaluatorException;
-import org.apache.synapse.commons.evaluators.EvaluatorConstants;
 import org.apache.synapse.transport.nhttp.util.NhttpMetricsCollector;
 
 import javax.net.ssl.SSLContext;
-import javax.xml.stream.XMLStreamException;
-import javax.xml.namespace.QName;
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Properties;
 
 /**
  * NIO transport listener for Axis2 based on HttpCore and NIO extensions
@@ -76,11 +58,6 @@ import java.util.Properties;
 public class HttpCoreNIOListener implements TransportListener, 
ManagementSupport {
 
     private static final Log log = 
LogFactory.getLog(HttpCoreNIOListener.class);
-
-    /** The Axis2 configuration context */
-    private ConfigurationContext cfgCtx;
-    /** The Axis2 Transport In Description for the transport */
-    private TransportInDescription transportIn;
     /** The IOReactor */
     private DefaultListeningIOReactor ioReactor = null;
 
@@ -94,32 +71,20 @@ public class HttpCoreNIOListener impleme
     private Map<String, String> eprToServiceNameMap = new HashMap<String, 
String>();
     /** the axis observer that gets notified of service life cycle events*/
     private final AxisObserver axisObserver = new GenericAxisObserver();
-    /** The port to listen on, defaults to 8280 */
-    private int port = 8280;
-    /** The hostname to use, defaults to localhost */
-    private String host = "localhost";
-    /** The bind addresses as (address, port) pairs */
-    private String bindAddress = null;
     /** SSLContext if this listener is a SSL listener */
     private SSLContext sslContext = null;
     /** The SSL session handler that manages client authentication etc */
     private SSLIOSessionHandler sslIOSessionHandler = null;
     /** JMX support */
     private TransportMBeanSupport mbeanSupport;
-    /** Metrics collector for this transport */
-    private NhttpMetricsCollector metrics = null;
     /** state of the listener */
     private volatile int state = BaseConstants.STOPPED;
     /** The ServerHandler */
     private ServerHandler handler = null;
-    /** This will execute the requests based on calculate priority */
-    private PriorityExecutor executor = null;
-    /** parser for calculating the priority of incoming messages */
-    private Parser parser = null;
-    /** if falses we won't dispatch to axis2 service in case of rest scenarios 
*/
-    private boolean restDispatching = true;
-    /** WSDL processor for Get requests*/
-    private HttpGetRequestProcessor httpGetRequestProcessor = null;
+    /** Listener configurations */
+    private ListenerContext listenerContext;
+    /** Metrics */
+    private NhttpMetricsCollector metrics = null;
 
     protected IOEventDispatch getEventDispatch(
         NHttpServiceHandler handler, SSLContext sslContext, 
@@ -128,73 +93,33 @@ public class HttpCoreNIOListener impleme
     }
 
     /**
-     * get HTTP protocol parameters to which the listener must adhere to
-     * @return the applicable HTTP protocol parameters
-     */
-    private HttpParams getServerParameters() {
-        HttpParams params = new BasicHttpParams();
-        NHttpConfiguration cfg = NHttpConfiguration.getInstance();
-        params
-            .setIntParameter(HttpConnectionParams.SO_TIMEOUT,
-                cfg.getProperty(HttpConnectionParams.SO_TIMEOUT, 60000))
-            .setIntParameter(HttpConnectionParams.SOCKET_BUFFER_SIZE,
-                cfg.getProperty(HttpConnectionParams.SOCKET_BUFFER_SIZE, 8 * 
1024))
-            .setBooleanParameter(HttpConnectionParams.STALE_CONNECTION_CHECK,
-                cfg.getProperty(HttpConnectionParams.STALE_CONNECTION_CHECK, 
0) == 1)
-            .setBooleanParameter(HttpConnectionParams.TCP_NODELAY,
-                cfg.getProperty(HttpConnectionParams.TCP_NODELAY, 1) == 1)
-            .setParameter(HttpProtocolParams.ORIGIN_SERVER, 
"Synapse-HttpComponents-NIO");
-
-        if (cfg.getBooleanValue(NIOReactorPNames.INTEREST_OPS_QUEUEING, 
false)) {
-            params.setBooleanParameter(NIOReactorPNames.INTEREST_OPS_QUEUEING, 
true);
-        }
-        return params;
-    }
-
-    /**
      * Initialize the transport listener, and execute reactor in new seperate 
thread
      * @param cfgCtx the Axis2 configuration context
-     * @param transprtIn the description of the http/s transport from Axis2 
configuration
+     * @param transportIn the description of the http/s transport from Axis2 
configuration
      * @throws AxisFault on error
      */
-    public void init(ConfigurationContext cfgCtx, TransportInDescription 
transprtIn)
+    public void init(ConfigurationContext cfgCtx, TransportInDescription 
transportIn)
             throws AxisFault {
 
-        this.cfgCtx = cfgCtx;
-        this.transportIn = transprtIn;
         cfgCtx.setProperty(NhttpConstants.EPR_TO_SERVICE_NAME_MAP, 
eprToServiceNameMap);
-        Parameter param = transprtIn.getParameter(PARAM_PORT);
-        if (param != null) {
-            port = Integer.parseInt((String) param.getValue());
-        }
 
-        param = transprtIn.getParameter(NhttpConstants.BIND_ADDRESS);
-        if (param != null) {
-            bindAddress = ((String) param.getValue()).trim();
-        }
+        // is this an SSL listener?
+        sslContext = getSSLContext(transportIn);
+        sslIOSessionHandler = getSSLIOSessionHandler(transportIn);
 
-        param = transprtIn.getParameter(HOST_ADDRESS);
-        if (param != null) {
-            host = ((String) param.getValue()).trim();
-        } else {
-            try {
-                host = java.net.InetAddress.getLocalHost().getHostName();
-            } catch (UnknownHostException e) {
-                log.warn("Unable to lookup local host name, using 
'localhost'");
-            }
-        }
+        listenerContext = new ListenerContext(cfgCtx, transportIn, sslContext 
!= null);
+        listenerContext.build();
 
-        // is this an SSL listener?
-        sslContext = getSSLContext(transprtIn);
-        sslIOSessionHandler = getSSLIOSessionHandler(transprtIn);
+        metrics = listenerContext.getMetrics();
 
-        param = transprtIn.getParameter(NhttpConstants.WSDL_EPR_PREFIX);
+        Parameter param = 
transportIn.getParameter(NhttpConstants.WSDL_EPR_PREFIX);
         if (param != null) {
             serviceEPRPrefix = getServiceEPRPrefix(cfgCtx, (String) 
param.getValue());
             customEPRPrefix = (String) param.getValue();
         } else {
-            serviceEPRPrefix = getServiceEPRPrefix(cfgCtx, host, port);
-            customEPRPrefix = transprtIn.getName() + "://" + host + ":" + 
(port == 80 ? "" : port) + "/";
+            serviceEPRPrefix = getServiceEPRPrefix(cfgCtx, 
listenerContext.getHost(), listenerContext.getPort());
+            customEPRPrefix = transportIn.getName() + "://" + 
listenerContext.getHost() +
+                    ":" + (listenerContext.getPort() == 80 ? "" : 
listenerContext.getPort()) + "/";
         }
 
         // register to receive updates on services for lifetime management
@@ -204,115 +129,12 @@ public class HttpCoreNIOListener impleme
         mbeanSupport
             = new TransportMBeanSupport(this, "nio-http" + (sslContext == null 
? "" : "s"));
         mbeanSupport.register();
-        metrics = new NhttpMetricsCollector(true, sslContext != null);
-
-        // create the priority based executor and parser
-        param = 
transprtIn.getParameter(NhttpConstants.PRIORITY_CONFIG_FILE_NAME);
-        if (param != null && param.getValue() != null) {
-            createPriorityConfiguration(param.getValue().toString());
-        }
-
-        param = 
transprtIn.getParameter(NhttpConstants.DISABLE_REST_SERVICE_DISPATCHING);
-        if (param != null && param.getValue() != null) {
-            if (param.getValue().equals("true")) {
-                restDispatching = false;
-            }
-        }
-
-        // create http Get processor
-        param = transprtIn.getParameter(NhttpConstants.HTTP_GET_PROCESSOR);
-        if (param != null && param.getValue() != null) {
-            httpGetRequestProcessor = 
createHttpGetProcessor(param.getValue().toString());
-            if (httpGetRequestProcessor == null) {
-                handleException("Cannot create HttpGetRequestProcessor");
-            }
-        } else {
-            httpGetRequestProcessor = new DefaultHttpGetProcessor();
-        }
-    }
-
-    private HttpGetRequestProcessor createHttpGetProcessor(String str) throws 
AxisFault {
-        Object obj = null;
-        try {
-            obj = Class.forName(str).newInstance();
-        } catch (ClassNotFoundException e) {
-            handleException("Error creating WSDL processor", e);
-        } catch (InstantiationException e) {
-            handleException("Error creating WSDL processor", e);
-        } catch (IllegalAccessException e) {
-            handleException("Error creating WSDL processor", e);
-        }
-
-        if (obj instanceof HttpGetRequestProcessor) {
-            return (HttpGetRequestProcessor) obj;
-        } else {
-            handleException("Error creating WSDL processor. The HttpProcessor 
should be of type " +
-                    
"org.apache.synapse.transport.nhttp.HttpGetRequestProcessor");
-        }
-
-        return null;
     }
 
     public int getActiveConnectionsSize() {
         return handler.getActiveConnectionsSize();
     }
 
-    /**
-     * Create a priority executor from the given file 
-     *
-     * @param fileName file name of the executor configuration
-     * @throws AxisFault if an error occurs
-     */
-    private void createPriorityConfiguration(String fileName) throws AxisFault 
{
-        OMElement definitions = null;
-        try {
-            FileInputStream fis = new FileInputStream(fileName);
-            definitions = new StAXOMBuilder(fis).getDocumentElement();
-            definitions.build();
-        } catch (FileNotFoundException e) {
-            handleException("Priority configuration file cannot be found : " + 
fileName, e);
-        } catch (XMLStreamException e) {
-            handleException("Error parsing priority configuration xml file " + 
fileName, e);
-        }
-
-        assert definitions != null;
-        OMElement executorElem = definitions.getFirstChildWithName(
-                new QName(ExecutorConstants.PRIORITY_EXECUTOR));
-
-        if (executorElem == null) {
-            handleException(ExecutorConstants.PRIORITY_EXECUTOR +
-                    " configuration is mandatory for priority based routing");
-        }
-
-        executor = PriorityExecutorFactory.createExecutor(
-                null, executorElem, false, new Properties());
-        OMElement conditionsElem = definitions.getFirstChildWithName(
-                new QName(EvaluatorConstants.CONDITIONS));
-        if (conditionsElem == null) {
-            handleException("Conditions configuration is mandatory for 
priority based routing");
-        }
-
-        executor.init();
-
-        assert conditionsElem != null;
-        OMAttribute defPriorityAttr = conditionsElem.getAttribute(
-                new QName(EvaluatorConstants.DEFAULT_PRIORITY));
-        if (defPriorityAttr != null) {
-            parser = new 
Parser(Integer.parseInt(defPriorityAttr.getAttributeValue()));
-        } else {
-            parser = new Parser();
-        }
-
-        try {
-            parser.init(conditionsElem);
-        } catch (EvaluatorException e) {
-            handleException("Invalid " + EvaluatorConstants.CONDITIONS +
-                    " configuration for priority based mediation", e);
-        }
-
-        log.info("Created a priority based executor from the configuration: " +
-                fileName);
-    }
 
     /**
      * Return the EPR prefix for services made available over this transport
@@ -374,7 +196,7 @@ public class HttpCoreNIOListener impleme
         }
         
         // configure the IO reactor on the specified port
-        HttpParams params = getServerParameters();
+        HttpParams params = listenerContext.getParams();
         try {
             String prefix = (sslContext == null ? "http" : "https") + 
"-Listener I/O dispatcher";
             ioReactor = new DefaultListeningIOReactor(
@@ -398,25 +220,26 @@ public class HttpCoreNIOListener impleme
             handleException("Error starting the IOReactor", e);
         }
 
+        ConfigurationContext cfgCtx = listenerContext.getCfgCtx();
+
         for (Object obj : 
cfgCtx.getAxisConfiguration().getServices().values()) {
             addToServiceURIMap((AxisService) obj);
         }
-        
-        handler = new ServerHandler(cfgCtx, params, sslContext != null
-                , metrics, parser, executor, restDispatching, 
httpGetRequestProcessor);
+
+        handler = new ServerHandler(listenerContext);
         final IOEventDispatch ioEventDispatch = getEventDispatch(handler,
                 sslContext, sslIOSessionHandler, params);
         state = BaseConstants.STARTED;
 
-        httpGetRequestProcessor.init(cfgCtx,handler);
+        listenerContext.getHttpGetRequestProcessor().init(cfgCtx, handler);
 
         ListenerEndpoint endpoint;
         try {
-            if (bindAddress == null) {
-                endpoint = ioReactor.listen(new InetSocketAddress(port));
+            if (listenerContext.getBindAddress() == null) {
+                endpoint = ioReactor.listen(new 
InetSocketAddress(listenerContext.getPort()));
             } else {
                 endpoint = ioReactor.listen(new InetSocketAddress(
-                    InetAddress.getByName(bindAddress), port));
+                    InetAddress.getByName(listenerContext.getBindAddress()), 
listenerContext.getPort()));
             }
         } catch (IOException e) {
             handleException("Encountered an I/O error: " + e.getMessage(), e);
@@ -448,9 +271,10 @@ public class HttpCoreNIOListener impleme
         } catch (InterruptedException e) {
             log.warn("HttpCoreNIOListener#start() was interrupted");
         }
-        
+
         log.info((sslContext == null ? "HTTP" : "HTTPS") + " Listener started 
on" +
-            (bindAddress != null ? " address : " + bindAddress : "") + " port 
: " + port);
+                (listenerContext.getBindAddress() != null ? " address : " + 
listenerContext.getBindAddress() : "") +
+                " port : " + listenerContext.getPort());
     }
 
     private void addToServiceURIMap(AxisService service) {
@@ -480,7 +304,7 @@ public class HttpCoreNIOListener impleme
             ioReactor.shutdown();
             handler.stop();
             state = BaseConstants.STOPPED;
-            for (Object obj : 
cfgCtx.getAxisConfiguration().getServices().values()) {
+            for (Object obj : 
listenerContext.getCfgCtx().getAxisConfiguration().getServices().values()) {
                 removeServiceFfromURIMap((AxisService) obj);
             }
         } catch (IOException e) {
@@ -626,7 +450,7 @@ public class HttpCoreNIOListener impleme
 
     public void destroy() {
         ioReactor = null;
-        cfgCtx.getAxisConfiguration().getObserversList().remove(axisObserver);
+        
listenerContext.getCfgCtx().getAxisConfiguration().getObserversList().remove(axisObserver);
         mbeanSupport.unregister();
         metrics.destroy();
     }
@@ -644,7 +468,7 @@ public class HttpCoreNIOListener impleme
         public void serviceUpdate(AxisEvent event, AxisService service) {
 
             if (!ignoreService(service)
-                    && BaseUtils.isUsingTransport(service, 
transportIn.getName())) {
+                    && BaseUtils.isUsingTransport(service, 
listenerContext.getTransportIn().getName())) {
                 switch (event.getEventType()) {
                     case AxisEvent.SERVICE_DEPLOY :
                         addToServiceURIMap(service);
@@ -684,11 +508,6 @@ public class HttpCoreNIOListener impleme
         throw new AxisFault(msg, e);
     }
 
-    private void handleException(String msg) throws AxisFault {
-        log.error(msg);
-        throw new AxisFault(msg);
-    }
-
     // -- jmx/management methods--
     public long getMessagesReceived() {
         if (metrics != null) {

Added: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ListenerContext.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ListenerContext.java?rev=1072963&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ListenerContext.java
 (added)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ListenerContext.java
 Mon Feb 21 13:23:54 2011
@@ -0,0 +1,305 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.transport.nhttp;
+
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.impl.builder.StAXOMBuilder;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.description.TransportInDescription;
+import org.apache.axis2.transport.TransportListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.http.nio.params.NIOReactorPNames;
+import org.apache.http.params.BasicHttpParams;
+import org.apache.http.params.HttpConnectionParams;
+import org.apache.http.params.HttpParams;
+import org.apache.http.params.HttpProtocolParams;
+import org.apache.synapse.commons.evaluators.EvaluatorConstants;
+import org.apache.synapse.commons.evaluators.EvaluatorException;
+import org.apache.synapse.commons.evaluators.Parser;
+import org.apache.synapse.commons.executors.ExecutorConstants;
+import org.apache.synapse.commons.executors.PriorityExecutor;
+import org.apache.synapse.commons.executors.config.PriorityExecutorFactory;
+import org.apache.synapse.transport.nhttp.util.NhttpMetricsCollector;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Properties;
+
+/**
+ * This class is being used to hold the different runtime objects used by the 
Listeners
+ */
+public class ListenerContext {
+    private Log log = LogFactory.getLog(ListenerContext.class);
+
+    /** The Axis2 configuration context */
+    private ConfigurationContext cfgCtx;
+    /** The Axis2 Transport In Description for the transport */
+    private TransportInDescription transportIn;
+    /** SSLContext if this listener is a SSL listener */
+    private boolean ssl = false;
+    /** JMX support */
+    private NhttpMetricsCollector metrics = null;
+    /** This will execute the requests based on calculate priority */
+    private PriorityExecutor executor = null;
+    /** parser for calculating the priority of incoming messages */
+    private Parser parser = null;
+    /** if false we won't dispatch to axis2 service in case of rest scenarios 
*/
+    private boolean restDispatching = true;
+    /** WSDL processor for Get requests*/
+    private HttpGetRequestProcessor httpGetRequestProcessor = null;
+    /** The port to listen on, defaults to 8280 */
+    private int port = 8280;
+    /** The hostname to use, defaults to localhost */
+    private String host = "localhost";
+    /** The bind addresses as (address, port) pairs */
+    private String bindAddress = null;
+
+    private HttpParams params = null;
+
+
+    public ListenerContext(ConfigurationContext cfgCtx,
+                           TransportInDescription transportIn,
+                           boolean ssl) {
+        this.cfgCtx = cfgCtx;
+        this.transportIn = transportIn;
+        this.ssl = ssl;
+    }
+
+    public void build() throws AxisFault {
+        Parameter param = 
transportIn.getParameter(TransportListener.PARAM_PORT);
+        if (param != null) {
+            port = Integer.parseInt((String) param.getValue());
+        }
+
+        param = transportIn.getParameter(NhttpConstants.BIND_ADDRESS);
+        if (param != null) {
+            bindAddress = ((String) param.getValue()).trim();
+        }
+
+        param = transportIn.getParameter(TransportListener.HOST_ADDRESS);
+        if (param != null) {
+            host = ((String) param.getValue()).trim();
+        } else {
+            try {
+                host = InetAddress.getLocalHost().getHostName();
+            } catch (UnknownHostException e) {
+                log.warn("Unable to lookup local host name, using 
'localhost'");
+            }
+        }
+
+        metrics = new NhttpMetricsCollector(true, ssl);
+
+        // create the priority based executor and parser
+        param = 
transportIn.getParameter(NhttpConstants.PRIORITY_CONFIG_FILE_NAME);
+        if (param != null && param.getValue() != null) {
+            createPriorityConfiguration(param.getValue().toString());
+        }
+
+        param = 
transportIn.getParameter(NhttpConstants.DISABLE_REST_SERVICE_DISPATCHING);
+        if (param != null && param.getValue() != null) {
+            if (param.getValue().equals("true")) {
+                restDispatching = false;
+            }
+        }
+
+        // create http Get processor
+        param = transportIn.getParameter(NhttpConstants.HTTP_GET_PROCESSOR);
+        if (param != null && param.getValue() != null) {
+            httpGetRequestProcessor = 
createHttpGetProcessor(param.getValue().toString());
+            if (httpGetRequestProcessor == null) {
+                handleException("Cannot create HttpGetRequestProcessor");
+            }
+        } else {
+            httpGetRequestProcessor = new DefaultHttpGetProcessor();
+        }
+
+        params = getListenerParameters();
+    }
+
+/**
+     * Create a priority executor from the given file
+     *
+     * @param fileName file name of the executor configuration
+     * @throws org.apache.axis2.AxisFault if an error occurs
+     */
+    private void createPriorityConfiguration(String fileName) throws AxisFault 
{
+        OMElement definitions = null;
+        try {
+            FileInputStream fis = new FileInputStream(fileName);
+            definitions = new StAXOMBuilder(fis).getDocumentElement();
+            definitions.build();
+        } catch (FileNotFoundException e) {
+            handleException("Priority configuration file cannot be found : " + 
fileName, e);
+        } catch (XMLStreamException e) {
+            handleException("Error parsing priority configuration xml file " + 
fileName, e);
+        }
+
+        assert definitions != null;
+        OMElement executorElem = definitions.getFirstChildWithName(
+                new QName(ExecutorConstants.PRIORITY_EXECUTOR));
+
+        if (executorElem == null) {
+            handleException(ExecutorConstants.PRIORITY_EXECUTOR +
+                    " configuration is mandatory for priority based routing");
+        }
+
+        executor = PriorityExecutorFactory.createExecutor(
+                null, executorElem, false, new Properties());
+        OMElement conditionsElem = definitions.getFirstChildWithName(
+                new QName(EvaluatorConstants.CONDITIONS));
+        if (conditionsElem == null) {
+            handleException("Conditions configuration is mandatory for 
priority based routing");
+        }
+
+        executor.init();
+
+        assert conditionsElem != null;
+        OMAttribute defPriorityAttr = conditionsElem.getAttribute(
+                new QName(EvaluatorConstants.DEFAULT_PRIORITY));
+        if (defPriorityAttr != null) {
+            parser = new 
Parser(Integer.parseInt(defPriorityAttr.getAttributeValue()));
+        } else {
+            parser = new Parser();
+        }
+
+        try {
+            parser.init(conditionsElem);
+        } catch (EvaluatorException e) {
+            handleException("Invalid " + EvaluatorConstants.CONDITIONS +
+                    " configuration for priority based mediation", e);
+        }
+
+        log.info("Created a priority based executor from the configuration: " +
+                fileName);
+    }
+
+    private HttpGetRequestProcessor createHttpGetProcessor(String str) throws 
AxisFault {
+        Object obj = null;
+        try {
+            obj = Class.forName(str).newInstance();
+        } catch (ClassNotFoundException e) {
+            handleException("Error creating WSDL processor", e);
+        } catch (InstantiationException e) {
+            handleException("Error creating WSDL processor", e);
+        } catch (IllegalAccessException e) {
+            handleException("Error creating WSDL processor", e);
+        }
+
+        if (obj instanceof HttpGetRequestProcessor) {
+            return (HttpGetRequestProcessor) obj;
+        } else {
+            handleException("Error creating WSDL processor. The HttpProcessor 
should be of type " +
+                    
"org.apache.synapse.transport.nhttp.HttpGetRequestProcessor");
+        }
+
+        return null;
+    }
+
+    /**
+     * get HTTP protocol parameters to which the listener must adhere to
+     * @return the applicable HTTP protocol parameters
+     */
+    private HttpParams getListenerParameters() {
+        HttpParams params = new BasicHttpParams();
+        NHttpConfiguration cfg = NHttpConfiguration.getInstance();
+        params
+            .setIntParameter(HttpConnectionParams.SO_TIMEOUT,
+                cfg.getProperty(HttpConnectionParams.SO_TIMEOUT, 60000))
+            .setIntParameter(HttpConnectionParams.SOCKET_BUFFER_SIZE,
+                cfg.getProperty(HttpConnectionParams.SOCKET_BUFFER_SIZE, 8 * 
1024))
+            .setBooleanParameter(HttpConnectionParams.STALE_CONNECTION_CHECK,
+                cfg.getProperty(HttpConnectionParams.STALE_CONNECTION_CHECK, 
0) == 1)
+            .setBooleanParameter(HttpConnectionParams.TCP_NODELAY,
+                cfg.getProperty(HttpConnectionParams.TCP_NODELAY, 1) == 1)
+            .setParameter(HttpProtocolParams.ORIGIN_SERVER, 
"Synapse-HttpComponents-NIO");
+
+        if (cfg.getBooleanValue(NIOReactorPNames.INTEREST_OPS_QUEUEING, 
false)) {
+            params.setBooleanParameter(NIOReactorPNames.INTEREST_OPS_QUEUEING, 
true);
+        }
+        return params;
+    }
+
+
+    public ConfigurationContext getCfgCtx() {
+        return cfgCtx;
+    }
+
+    public TransportInDescription getTransportIn() {
+        return transportIn;
+    }
+
+    public boolean isSsl() {
+        return ssl;
+    }
+
+    public NhttpMetricsCollector getMetrics() {
+        return metrics;
+    }
+
+    public PriorityExecutor getExecutor() {
+        return executor;
+    }
+
+    public Parser getParser() {
+        return parser;
+    }
+
+    public boolean isRestDispatching() {
+        return restDispatching;
+    }
+
+    public HttpGetRequestProcessor getHttpGetRequestProcessor() {
+        return httpGetRequestProcessor;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public String getBindAddress() {
+        return bindAddress;
+    }
+
+    public HttpParams getParams() {
+        return params;
+    }
+
+    private void handleException(String msg, Exception e) throws AxisFault {
+        log.error(msg, e);
+        throw new AxisFault(msg, e);
+    }
+
+    private void handleException(String msg) throws AxisFault {
+        log.error(msg);
+        throw new AxisFault(msg);
+    }
+}

Modified: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java?rev=1072963&r1=1072962&r2=1072963&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
 (original)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
 Mon Feb 21 13:23:54 2011
@@ -121,15 +121,15 @@ public class ServerHandler implements NH
     public static final String CONNECTION_CREATION_TIME = 
"synapse.connectionCreationTime";
     public static final String SERVER_CONNECTION_DEBUG = 
"synapse.server-connection-debug";
 
-    public ServerHandler(final ConfigurationContext cfgCtx, final HttpParams 
params,
-        final boolean isHttps, final NhttpMetricsCollector metrics,
-        Parser parser, PriorityExecutor executor, boolean restDispatching,
-        HttpGetRequestProcessor httpGetRequestProcessor) {
+    private ListenerContext listenerContext = null;
+
+    public ServerHandler(ListenerContext listenerContext) {
         super();
-        this.cfgCtx = cfgCtx;
-        this.params = params;
-        this.isHttps = isHttps;
-        this.metrics = metrics;
+        this.listenerContext = listenerContext;
+        this.cfgCtx = listenerContext.getCfgCtx();
+        this.params = listenerContext.getParams();
+        this.isHttps = listenerContext.isSsl();
+        this.metrics = listenerContext.getMetrics();
         this.responseFactory = new DefaultHttpResponseFactory();
         this.httpProcessor = getHttpProcessor();
         this.connStrategy = new DefaultConnectionReuseStrategy();
@@ -137,10 +137,10 @@ public class ServerHandler implements NH
         this.activeConnections = new ArrayList<NHttpServerConnection>();
         this.latencyView = new LatencyView(isHttps);
         this.threadingView = new ThreadingView("HttpServerWorker", true, 50);
-        this.restDispatching = restDispatching;
+        this.restDispatching = listenerContext.isRestDispatching();
 
         this.cfg = NHttpConfiguration.getInstance();
-        if (executor == null)  {
+        if (listenerContext.getExecutor() == null)  {
             this.workerPool = WorkerPoolFactory.getWorkerPool(
                 cfg.getServerCoreThreads(),
                 cfg.getServerMaxThreads(),
@@ -148,11 +148,11 @@ public class ServerHandler implements NH
                 cfg.getServerQueueLen(),
                 "Server Worker thread group", "HttpServerWorker");
         } else {
-            this.executor = executor;
-            this.parser = parser;
+            this.executor = listenerContext.getExecutor();
+            this.parser = listenerContext.getParser();
         }
 
-        this.httpGetRequestProcessor = httpGetRequestProcessor;
+        this.httpGetRequestProcessor = 
listenerContext.getHttpGetRequestProcessor();
     }
 
     /**
@@ -209,8 +209,8 @@ public class ServerHandler implements NH
                 metrics.incrementMessagesReceived();
             }
             // hand off processing of the request to a thread off the pool
-            ServerWorker worker = new ServerWorker(cfgCtx, conn, isHttps, 
metrics, this,
-                        request, is, response, os, restDispatching, 
httpGetRequestProcessor);
+            ServerWorker worker = new ServerWorker(listenerContext, conn, this,
+                    request, is, response, os);
 
             if (workerPool != null) {
                 workerPool.execute(worker);

Modified: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerWorker.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerWorker.java?rev=1072963&r1=1072962&r2=1072963&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerWorker.java
 (original)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerWorker.java
 Mon Feb 21 13:23:54 2011
@@ -95,38 +95,33 @@ public class ServerWorker implements Run
      * its output. This however does not force the processor to write a 
response back as the
      * traditional servlet service() method, but creates the background 
required to write the
      * response, if one would be created.
-     * @param cfgCtx the Axis2 configuration context
+     *
+     * @param listenerContext the listener configuration
      * @param conn the underlying http connection
-     * @param isHttps whether https or not
-     * @param metrics metrics for the transport
      * @param serverHandler the handler of the server side messages
      * @param request the http request received (might still be in the process 
of being streamed)
      * @param is the stream input stream to read the request body
      * @param response the response to be populated if applicable
      * @param os the output stream to write the response body if one is 
applicable
-     * @param isRestDispatching weather we should dispatch in case of rest
      */
-    public ServerWorker(final ConfigurationContext cfgCtx, final 
NHttpServerConnection conn,
-        final boolean isHttps,
-        final MetricsCollector metrics,
+    public ServerWorker(ListenerContext listenerContext,
+                        final NHttpServerConnection conn,
         final ServerHandler serverHandler,
         final HttpRequest request, final InputStream is,
-        final HttpResponse response, final OutputStream os,
-        final boolean isRestDispatching,
-        final HttpGetRequestProcessor httpGetRequestProcessor) {
+        final HttpResponse response, final OutputStream os) {
 
-        this.cfgCtx = cfgCtx;
+        this.cfgCtx = listenerContext.getCfgCtx();
         this.conn = conn;
-        this.isHttps = isHttps;
-        this.metrics = metrics;
+        this.isHttps = listenerContext.isSsl();
+        this.metrics = listenerContext.getMetrics();
         this.serverHandler = serverHandler;
         this.request = request;
         this.response = response;
         this.is = is;
         this.os = os;
         this.msgContext = createMessageContext(request);
-        this.isRestDispatching = isRestDispatching;
-        this.httpGetRequestProcessor = httpGetRequestProcessor;
+        this.isRestDispatching = listenerContext.isRestDispatching();
+        this.httpGetRequestProcessor = 
listenerContext.getHttpGetRequestProcessor();
     }
 
     /**


Reply via email to