Author: supun
Date: Mon Feb 21 13:23:54 2011
New Revision: 1072963
URL: http://svn.apache.org/viewvc?rev=1072963&view=rev
Log:
adding a listener context to hold some of the configurations of the listener,
this re-factors some methods to have less parameters
Added:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ListenerContext.java
Modified:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerWorker.java
Modified:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java?rev=1072963&r1=1072962&r2=1072963&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
(original)
+++
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
Mon Feb 21 13:23:54 2011
@@ -19,8 +19,6 @@
package org.apache.synapse.transport.nhttp;
import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.OMAttribute;
-import org.apache.axiom.om.impl.builder.StAXOMBuilder;
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.context.ConfigurationContext;
@@ -39,36 +37,20 @@ import org.apache.commons.logging.LogFac
import org.apache.http.impl.nio.reactor.DefaultListeningIOReactor;
import org.apache.http.impl.nio.reactor.SSLIOSessionHandler;
import org.apache.http.nio.NHttpServiceHandler;
-import org.apache.http.nio.params.NIOReactorPNames;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.IOReactorExceptionHandler;
import org.apache.http.nio.reactor.ListenerEndpoint;
-import org.apache.http.params.BasicHttpParams;
-import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpParams;
-import org.apache.http.params.HttpProtocolParams;
-import org.apache.synapse.commons.executors.PriorityExecutor;
-import org.apache.synapse.commons.executors.ExecutorConstants;
-import org.apache.synapse.commons.executors.config.PriorityExecutorFactory;
-import org.apache.synapse.commons.evaluators.Parser;
-import org.apache.synapse.commons.evaluators.EvaluatorException;
-import org.apache.synapse.commons.evaluators.EvaluatorConstants;
import org.apache.synapse.transport.nhttp.util.NhttpMetricsCollector;
import javax.net.ssl.SSLContext;
-import javax.xml.stream.XMLStreamException;
-import javax.xml.namespace.QName;
import java.io.IOException;
import java.io.InterruptedIOException;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
-import java.util.Properties;
/**
* NIO transport listener for Axis2 based on HttpCore and NIO extensions
@@ -76,11 +58,6 @@ import java.util.Properties;
public class HttpCoreNIOListener implements TransportListener,
ManagementSupport {
private static final Log log =
LogFactory.getLog(HttpCoreNIOListener.class);
-
- /** The Axis2 configuration context */
- private ConfigurationContext cfgCtx;
- /** The Axis2 Transport In Description for the transport */
- private TransportInDescription transportIn;
/** The IOReactor */
private DefaultListeningIOReactor ioReactor = null;
@@ -94,32 +71,20 @@ public class HttpCoreNIOListener impleme
private Map<String, String> eprToServiceNameMap = new HashMap<String,
String>();
/** the axis observer that gets notified of service life cycle events*/
private final AxisObserver axisObserver = new GenericAxisObserver();
- /** The port to listen on, defaults to 8280 */
- private int port = 8280;
- /** The hostname to use, defaults to localhost */
- private String host = "localhost";
- /** The bind addresses as (address, port) pairs */
- private String bindAddress = null;
/** SSLContext if this listener is a SSL listener */
private SSLContext sslContext = null;
/** The SSL session handler that manages client authentication etc */
private SSLIOSessionHandler sslIOSessionHandler = null;
/** JMX support */
private TransportMBeanSupport mbeanSupport;
- /** Metrics collector for this transport */
- private NhttpMetricsCollector metrics = null;
/** state of the listener */
private volatile int state = BaseConstants.STOPPED;
/** The ServerHandler */
private ServerHandler handler = null;
- /** This will execute the requests based on calculate priority */
- private PriorityExecutor executor = null;
- /** parser for calculating the priority of incoming messages */
- private Parser parser = null;
- /** if falses we won't dispatch to axis2 service in case of rest scenarios
*/
- private boolean restDispatching = true;
- /** WSDL processor for Get requests*/
- private HttpGetRequestProcessor httpGetRequestProcessor = null;
+ /** Listener configurations */
+ private ListenerContext listenerContext;
+ /** Metrics */
+ private NhttpMetricsCollector metrics = null;
protected IOEventDispatch getEventDispatch(
NHttpServiceHandler handler, SSLContext sslContext,
@@ -128,73 +93,33 @@ public class HttpCoreNIOListener impleme
}
/**
- * get HTTP protocol parameters to which the listener must adhere to
- * @return the applicable HTTP protocol parameters
- */
- private HttpParams getServerParameters() {
- HttpParams params = new BasicHttpParams();
- NHttpConfiguration cfg = NHttpConfiguration.getInstance();
- params
- .setIntParameter(HttpConnectionParams.SO_TIMEOUT,
- cfg.getProperty(HttpConnectionParams.SO_TIMEOUT, 60000))
- .setIntParameter(HttpConnectionParams.SOCKET_BUFFER_SIZE,
- cfg.getProperty(HttpConnectionParams.SOCKET_BUFFER_SIZE, 8 *
1024))
- .setBooleanParameter(HttpConnectionParams.STALE_CONNECTION_CHECK,
- cfg.getProperty(HttpConnectionParams.STALE_CONNECTION_CHECK,
0) == 1)
- .setBooleanParameter(HttpConnectionParams.TCP_NODELAY,
- cfg.getProperty(HttpConnectionParams.TCP_NODELAY, 1) == 1)
- .setParameter(HttpProtocolParams.ORIGIN_SERVER,
"Synapse-HttpComponents-NIO");
-
- if (cfg.getBooleanValue(NIOReactorPNames.INTEREST_OPS_QUEUEING,
false)) {
- params.setBooleanParameter(NIOReactorPNames.INTEREST_OPS_QUEUEING,
true);
- }
- return params;
- }
-
- /**
* Initialize the transport listener, and execute reactor in new seperate
thread
* @param cfgCtx the Axis2 configuration context
- * @param transprtIn the description of the http/s transport from Axis2
configuration
+ * @param transportIn the description of the http/s transport from Axis2
configuration
* @throws AxisFault on error
*/
- public void init(ConfigurationContext cfgCtx, TransportInDescription
transprtIn)
+ public void init(ConfigurationContext cfgCtx, TransportInDescription
transportIn)
throws AxisFault {
- this.cfgCtx = cfgCtx;
- this.transportIn = transprtIn;
cfgCtx.setProperty(NhttpConstants.EPR_TO_SERVICE_NAME_MAP,
eprToServiceNameMap);
- Parameter param = transprtIn.getParameter(PARAM_PORT);
- if (param != null) {
- port = Integer.parseInt((String) param.getValue());
- }
- param = transprtIn.getParameter(NhttpConstants.BIND_ADDRESS);
- if (param != null) {
- bindAddress = ((String) param.getValue()).trim();
- }
+ // is this an SSL listener?
+ sslContext = getSSLContext(transportIn);
+ sslIOSessionHandler = getSSLIOSessionHandler(transportIn);
- param = transprtIn.getParameter(HOST_ADDRESS);
- if (param != null) {
- host = ((String) param.getValue()).trim();
- } else {
- try {
- host = java.net.InetAddress.getLocalHost().getHostName();
- } catch (UnknownHostException e) {
- log.warn("Unable to lookup local host name, using
'localhost'");
- }
- }
+ listenerContext = new ListenerContext(cfgCtx, transportIn, sslContext
!= null);
+ listenerContext.build();
- // is this an SSL listener?
- sslContext = getSSLContext(transprtIn);
- sslIOSessionHandler = getSSLIOSessionHandler(transprtIn);
+ metrics = listenerContext.getMetrics();
- param = transprtIn.getParameter(NhttpConstants.WSDL_EPR_PREFIX);
+ Parameter param =
transportIn.getParameter(NhttpConstants.WSDL_EPR_PREFIX);
if (param != null) {
serviceEPRPrefix = getServiceEPRPrefix(cfgCtx, (String)
param.getValue());
customEPRPrefix = (String) param.getValue();
} else {
- serviceEPRPrefix = getServiceEPRPrefix(cfgCtx, host, port);
- customEPRPrefix = transprtIn.getName() + "://" + host + ":" +
(port == 80 ? "" : port) + "/";
+ serviceEPRPrefix = getServiceEPRPrefix(cfgCtx,
listenerContext.getHost(), listenerContext.getPort());
+ customEPRPrefix = transportIn.getName() + "://" +
listenerContext.getHost() +
+ ":" + (listenerContext.getPort() == 80 ? "" :
listenerContext.getPort()) + "/";
}
// register to receive updates on services for lifetime management
@@ -204,115 +129,12 @@ public class HttpCoreNIOListener impleme
mbeanSupport
= new TransportMBeanSupport(this, "nio-http" + (sslContext == null
? "" : "s"));
mbeanSupport.register();
- metrics = new NhttpMetricsCollector(true, sslContext != null);
-
- // create the priority based executor and parser
- param =
transprtIn.getParameter(NhttpConstants.PRIORITY_CONFIG_FILE_NAME);
- if (param != null && param.getValue() != null) {
- createPriorityConfiguration(param.getValue().toString());
- }
-
- param =
transprtIn.getParameter(NhttpConstants.DISABLE_REST_SERVICE_DISPATCHING);
- if (param != null && param.getValue() != null) {
- if (param.getValue().equals("true")) {
- restDispatching = false;
- }
- }
-
- // create http Get processor
- param = transprtIn.getParameter(NhttpConstants.HTTP_GET_PROCESSOR);
- if (param != null && param.getValue() != null) {
- httpGetRequestProcessor =
createHttpGetProcessor(param.getValue().toString());
- if (httpGetRequestProcessor == null) {
- handleException("Cannot create HttpGetRequestProcessor");
- }
- } else {
- httpGetRequestProcessor = new DefaultHttpGetProcessor();
- }
- }
-
- private HttpGetRequestProcessor createHttpGetProcessor(String str) throws
AxisFault {
- Object obj = null;
- try {
- obj = Class.forName(str).newInstance();
- } catch (ClassNotFoundException e) {
- handleException("Error creating WSDL processor", e);
- } catch (InstantiationException e) {
- handleException("Error creating WSDL processor", e);
- } catch (IllegalAccessException e) {
- handleException("Error creating WSDL processor", e);
- }
-
- if (obj instanceof HttpGetRequestProcessor) {
- return (HttpGetRequestProcessor) obj;
- } else {
- handleException("Error creating WSDL processor. The HttpProcessor
should be of type " +
-
"org.apache.synapse.transport.nhttp.HttpGetRequestProcessor");
- }
-
- return null;
}
public int getActiveConnectionsSize() {
return handler.getActiveConnectionsSize();
}
- /**
- * Create a priority executor from the given file
- *
- * @param fileName file name of the executor configuration
- * @throws AxisFault if an error occurs
- */
- private void createPriorityConfiguration(String fileName) throws AxisFault
{
- OMElement definitions = null;
- try {
- FileInputStream fis = new FileInputStream(fileName);
- definitions = new StAXOMBuilder(fis).getDocumentElement();
- definitions.build();
- } catch (FileNotFoundException e) {
- handleException("Priority configuration file cannot be found : " +
fileName, e);
- } catch (XMLStreamException e) {
- handleException("Error parsing priority configuration xml file " +
fileName, e);
- }
-
- assert definitions != null;
- OMElement executorElem = definitions.getFirstChildWithName(
- new QName(ExecutorConstants.PRIORITY_EXECUTOR));
-
- if (executorElem == null) {
- handleException(ExecutorConstants.PRIORITY_EXECUTOR +
- " configuration is mandatory for priority based routing");
- }
-
- executor = PriorityExecutorFactory.createExecutor(
- null, executorElem, false, new Properties());
- OMElement conditionsElem = definitions.getFirstChildWithName(
- new QName(EvaluatorConstants.CONDITIONS));
- if (conditionsElem == null) {
- handleException("Conditions configuration is mandatory for
priority based routing");
- }
-
- executor.init();
-
- assert conditionsElem != null;
- OMAttribute defPriorityAttr = conditionsElem.getAttribute(
- new QName(EvaluatorConstants.DEFAULT_PRIORITY));
- if (defPriorityAttr != null) {
- parser = new
Parser(Integer.parseInt(defPriorityAttr.getAttributeValue()));
- } else {
- parser = new Parser();
- }
-
- try {
- parser.init(conditionsElem);
- } catch (EvaluatorException e) {
- handleException("Invalid " + EvaluatorConstants.CONDITIONS +
- " configuration for priority based mediation", e);
- }
-
- log.info("Created a priority based executor from the configuration: " +
- fileName);
- }
/**
* Return the EPR prefix for services made available over this transport
@@ -374,7 +196,7 @@ public class HttpCoreNIOListener impleme
}
// configure the IO reactor on the specified port
- HttpParams params = getServerParameters();
+ HttpParams params = listenerContext.getParams();
try {
String prefix = (sslContext == null ? "http" : "https") +
"-Listener I/O dispatcher";
ioReactor = new DefaultListeningIOReactor(
@@ -398,25 +220,26 @@ public class HttpCoreNIOListener impleme
handleException("Error starting the IOReactor", e);
}
+ ConfigurationContext cfgCtx = listenerContext.getCfgCtx();
+
for (Object obj :
cfgCtx.getAxisConfiguration().getServices().values()) {
addToServiceURIMap((AxisService) obj);
}
-
- handler = new ServerHandler(cfgCtx, params, sslContext != null
- , metrics, parser, executor, restDispatching,
httpGetRequestProcessor);
+
+ handler = new ServerHandler(listenerContext);
final IOEventDispatch ioEventDispatch = getEventDispatch(handler,
sslContext, sslIOSessionHandler, params);
state = BaseConstants.STARTED;
- httpGetRequestProcessor.init(cfgCtx,handler);
+ listenerContext.getHttpGetRequestProcessor().init(cfgCtx, handler);
ListenerEndpoint endpoint;
try {
- if (bindAddress == null) {
- endpoint = ioReactor.listen(new InetSocketAddress(port));
+ if (listenerContext.getBindAddress() == null) {
+ endpoint = ioReactor.listen(new
InetSocketAddress(listenerContext.getPort()));
} else {
endpoint = ioReactor.listen(new InetSocketAddress(
- InetAddress.getByName(bindAddress), port));
+ InetAddress.getByName(listenerContext.getBindAddress()),
listenerContext.getPort()));
}
} catch (IOException e) {
handleException("Encountered an I/O error: " + e.getMessage(), e);
@@ -448,9 +271,10 @@ public class HttpCoreNIOListener impleme
} catch (InterruptedException e) {
log.warn("HttpCoreNIOListener#start() was interrupted");
}
-
+
log.info((sslContext == null ? "HTTP" : "HTTPS") + " Listener started
on" +
- (bindAddress != null ? " address : " + bindAddress : "") + " port
: " + port);
+ (listenerContext.getBindAddress() != null ? " address : " +
listenerContext.getBindAddress() : "") +
+ " port : " + listenerContext.getPort());
}
private void addToServiceURIMap(AxisService service) {
@@ -480,7 +304,7 @@ public class HttpCoreNIOListener impleme
ioReactor.shutdown();
handler.stop();
state = BaseConstants.STOPPED;
- for (Object obj :
cfgCtx.getAxisConfiguration().getServices().values()) {
+ for (Object obj :
listenerContext.getCfgCtx().getAxisConfiguration().getServices().values()) {
removeServiceFfromURIMap((AxisService) obj);
}
} catch (IOException e) {
@@ -626,7 +450,7 @@ public class HttpCoreNIOListener impleme
public void destroy() {
ioReactor = null;
- cfgCtx.getAxisConfiguration().getObserversList().remove(axisObserver);
+
listenerContext.getCfgCtx().getAxisConfiguration().getObserversList().remove(axisObserver);
mbeanSupport.unregister();
metrics.destroy();
}
@@ -644,7 +468,7 @@ public class HttpCoreNIOListener impleme
public void serviceUpdate(AxisEvent event, AxisService service) {
if (!ignoreService(service)
- && BaseUtils.isUsingTransport(service,
transportIn.getName())) {
+ && BaseUtils.isUsingTransport(service,
listenerContext.getTransportIn().getName())) {
switch (event.getEventType()) {
case AxisEvent.SERVICE_DEPLOY :
addToServiceURIMap(service);
@@ -684,11 +508,6 @@ public class HttpCoreNIOListener impleme
throw new AxisFault(msg, e);
}
- private void handleException(String msg) throws AxisFault {
- log.error(msg);
- throw new AxisFault(msg);
- }
-
// -- jmx/management methods--
public long getMessagesReceived() {
if (metrics != null) {
Added:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ListenerContext.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ListenerContext.java?rev=1072963&view=auto
==============================================================================
---
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ListenerContext.java
(added)
+++
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ListenerContext.java
Mon Feb 21 13:23:54 2011
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.transport.nhttp;
+
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.impl.builder.StAXOMBuilder;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.description.TransportInDescription;
+import org.apache.axis2.transport.TransportListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.http.nio.params.NIOReactorPNames;
+import org.apache.http.params.BasicHttpParams;
+import org.apache.http.params.HttpConnectionParams;
+import org.apache.http.params.HttpParams;
+import org.apache.http.params.HttpProtocolParams;
+import org.apache.synapse.commons.evaluators.EvaluatorConstants;
+import org.apache.synapse.commons.evaluators.EvaluatorException;
+import org.apache.synapse.commons.evaluators.Parser;
+import org.apache.synapse.commons.executors.ExecutorConstants;
+import org.apache.synapse.commons.executors.PriorityExecutor;
+import org.apache.synapse.commons.executors.config.PriorityExecutorFactory;
+import org.apache.synapse.transport.nhttp.util.NhttpMetricsCollector;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Properties;
+
+/**
+ * This class is being used to hold the different runtime objects used by the
Listeners
+ */
+public class ListenerContext {
+ private Log log = LogFactory.getLog(ListenerContext.class);
+
+ /** The Axis2 configuration context */
+ private ConfigurationContext cfgCtx;
+ /** The Axis2 Transport In Description for the transport */
+ private TransportInDescription transportIn;
+ /** SSLContext if this listener is a SSL listener */
+ private boolean ssl = false;
+ /** JMX support */
+ private NhttpMetricsCollector metrics = null;
+ /** This will execute the requests based on calculate priority */
+ private PriorityExecutor executor = null;
+ /** parser for calculating the priority of incoming messages */
+ private Parser parser = null;
+ /** if false we won't dispatch to axis2 service in case of rest scenarios
*/
+ private boolean restDispatching = true;
+ /** WSDL processor for Get requests*/
+ private HttpGetRequestProcessor httpGetRequestProcessor = null;
+ /** The port to listen on, defaults to 8280 */
+ private int port = 8280;
+ /** The hostname to use, defaults to localhost */
+ private String host = "localhost";
+ /** The bind addresses as (address, port) pairs */
+ private String bindAddress = null;
+
+ private HttpParams params = null;
+
+
+ public ListenerContext(ConfigurationContext cfgCtx,
+ TransportInDescription transportIn,
+ boolean ssl) {
+ this.cfgCtx = cfgCtx;
+ this.transportIn = transportIn;
+ this.ssl = ssl;
+ }
+
+ public void build() throws AxisFault {
+ Parameter param =
transportIn.getParameter(TransportListener.PARAM_PORT);
+ if (param != null) {
+ port = Integer.parseInt((String) param.getValue());
+ }
+
+ param = transportIn.getParameter(NhttpConstants.BIND_ADDRESS);
+ if (param != null) {
+ bindAddress = ((String) param.getValue()).trim();
+ }
+
+ param = transportIn.getParameter(TransportListener.HOST_ADDRESS);
+ if (param != null) {
+ host = ((String) param.getValue()).trim();
+ } else {
+ try {
+ host = InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ log.warn("Unable to lookup local host name, using
'localhost'");
+ }
+ }
+
+ metrics = new NhttpMetricsCollector(true, ssl);
+
+ // create the priority based executor and parser
+ param =
transportIn.getParameter(NhttpConstants.PRIORITY_CONFIG_FILE_NAME);
+ if (param != null && param.getValue() != null) {
+ createPriorityConfiguration(param.getValue().toString());
+ }
+
+ param =
transportIn.getParameter(NhttpConstants.DISABLE_REST_SERVICE_DISPATCHING);
+ if (param != null && param.getValue() != null) {
+ if (param.getValue().equals("true")) {
+ restDispatching = false;
+ }
+ }
+
+ // create http Get processor
+ param = transportIn.getParameter(NhttpConstants.HTTP_GET_PROCESSOR);
+ if (param != null && param.getValue() != null) {
+ httpGetRequestProcessor =
createHttpGetProcessor(param.getValue().toString());
+ if (httpGetRequestProcessor == null) {
+ handleException("Cannot create HttpGetRequestProcessor");
+ }
+ } else {
+ httpGetRequestProcessor = new DefaultHttpGetProcessor();
+ }
+
+ params = getListenerParameters();
+ }
+
+/**
+ * Create a priority executor from the given file
+ *
+ * @param fileName file name of the executor configuration
+ * @throws org.apache.axis2.AxisFault if an error occurs
+ */
+ private void createPriorityConfiguration(String fileName) throws AxisFault
{
+ OMElement definitions = null;
+ try {
+ FileInputStream fis = new FileInputStream(fileName);
+ definitions = new StAXOMBuilder(fis).getDocumentElement();
+ definitions.build();
+ } catch (FileNotFoundException e) {
+ handleException("Priority configuration file cannot be found : " +
fileName, e);
+ } catch (XMLStreamException e) {
+ handleException("Error parsing priority configuration xml file " +
fileName, e);
+ }
+
+ assert definitions != null;
+ OMElement executorElem = definitions.getFirstChildWithName(
+ new QName(ExecutorConstants.PRIORITY_EXECUTOR));
+
+ if (executorElem == null) {
+ handleException(ExecutorConstants.PRIORITY_EXECUTOR +
+ " configuration is mandatory for priority based routing");
+ }
+
+ executor = PriorityExecutorFactory.createExecutor(
+ null, executorElem, false, new Properties());
+ OMElement conditionsElem = definitions.getFirstChildWithName(
+ new QName(EvaluatorConstants.CONDITIONS));
+ if (conditionsElem == null) {
+ handleException("Conditions configuration is mandatory for
priority based routing");
+ }
+
+ executor.init();
+
+ assert conditionsElem != null;
+ OMAttribute defPriorityAttr = conditionsElem.getAttribute(
+ new QName(EvaluatorConstants.DEFAULT_PRIORITY));
+ if (defPriorityAttr != null) {
+ parser = new
Parser(Integer.parseInt(defPriorityAttr.getAttributeValue()));
+ } else {
+ parser = new Parser();
+ }
+
+ try {
+ parser.init(conditionsElem);
+ } catch (EvaluatorException e) {
+ handleException("Invalid " + EvaluatorConstants.CONDITIONS +
+ " configuration for priority based mediation", e);
+ }
+
+ log.info("Created a priority based executor from the configuration: " +
+ fileName);
+ }
+
+ private HttpGetRequestProcessor createHttpGetProcessor(String str) throws
AxisFault {
+ Object obj = null;
+ try {
+ obj = Class.forName(str).newInstance();
+ } catch (ClassNotFoundException e) {
+ handleException("Error creating WSDL processor", e);
+ } catch (InstantiationException e) {
+ handleException("Error creating WSDL processor", e);
+ } catch (IllegalAccessException e) {
+ handleException("Error creating WSDL processor", e);
+ }
+
+ if (obj instanceof HttpGetRequestProcessor) {
+ return (HttpGetRequestProcessor) obj;
+ } else {
+ handleException("Error creating WSDL processor. The HttpProcessor
should be of type " +
+
"org.apache.synapse.transport.nhttp.HttpGetRequestProcessor");
+ }
+
+ return null;
+ }
+
+ /**
+ * get HTTP protocol parameters to which the listener must adhere to
+ * @return the applicable HTTP protocol parameters
+ */
+ private HttpParams getListenerParameters() {
+ HttpParams params = new BasicHttpParams();
+ NHttpConfiguration cfg = NHttpConfiguration.getInstance();
+ params
+ .setIntParameter(HttpConnectionParams.SO_TIMEOUT,
+ cfg.getProperty(HttpConnectionParams.SO_TIMEOUT, 60000))
+ .setIntParameter(HttpConnectionParams.SOCKET_BUFFER_SIZE,
+ cfg.getProperty(HttpConnectionParams.SOCKET_BUFFER_SIZE, 8 *
1024))
+ .setBooleanParameter(HttpConnectionParams.STALE_CONNECTION_CHECK,
+ cfg.getProperty(HttpConnectionParams.STALE_CONNECTION_CHECK,
0) == 1)
+ .setBooleanParameter(HttpConnectionParams.TCP_NODELAY,
+ cfg.getProperty(HttpConnectionParams.TCP_NODELAY, 1) == 1)
+ .setParameter(HttpProtocolParams.ORIGIN_SERVER,
"Synapse-HttpComponents-NIO");
+
+ if (cfg.getBooleanValue(NIOReactorPNames.INTEREST_OPS_QUEUEING,
false)) {
+ params.setBooleanParameter(NIOReactorPNames.INTEREST_OPS_QUEUEING,
true);
+ }
+ return params;
+ }
+
+
+ public ConfigurationContext getCfgCtx() {
+ return cfgCtx;
+ }
+
+ public TransportInDescription getTransportIn() {
+ return transportIn;
+ }
+
+ public boolean isSsl() {
+ return ssl;
+ }
+
+ public NhttpMetricsCollector getMetrics() {
+ return metrics;
+ }
+
+ public PriorityExecutor getExecutor() {
+ return executor;
+ }
+
+ public Parser getParser() {
+ return parser;
+ }
+
+ public boolean isRestDispatching() {
+ return restDispatching;
+ }
+
+ public HttpGetRequestProcessor getHttpGetRequestProcessor() {
+ return httpGetRequestProcessor;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public String getBindAddress() {
+ return bindAddress;
+ }
+
+ public HttpParams getParams() {
+ return params;
+ }
+
+ private void handleException(String msg, Exception e) throws AxisFault {
+ log.error(msg, e);
+ throw new AxisFault(msg, e);
+ }
+
+ private void handleException(String msg) throws AxisFault {
+ log.error(msg);
+ throw new AxisFault(msg);
+ }
+}
Modified:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java?rev=1072963&r1=1072962&r2=1072963&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
(original)
+++
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
Mon Feb 21 13:23:54 2011
@@ -121,15 +121,15 @@ public class ServerHandler implements NH
public static final String CONNECTION_CREATION_TIME =
"synapse.connectionCreationTime";
public static final String SERVER_CONNECTION_DEBUG =
"synapse.server-connection-debug";
- public ServerHandler(final ConfigurationContext cfgCtx, final HttpParams
params,
- final boolean isHttps, final NhttpMetricsCollector metrics,
- Parser parser, PriorityExecutor executor, boolean restDispatching,
- HttpGetRequestProcessor httpGetRequestProcessor) {
+ private ListenerContext listenerContext = null;
+
+ public ServerHandler(ListenerContext listenerContext) {
super();
- this.cfgCtx = cfgCtx;
- this.params = params;
- this.isHttps = isHttps;
- this.metrics = metrics;
+ this.listenerContext = listenerContext;
+ this.cfgCtx = listenerContext.getCfgCtx();
+ this.params = listenerContext.getParams();
+ this.isHttps = listenerContext.isSsl();
+ this.metrics = listenerContext.getMetrics();
this.responseFactory = new DefaultHttpResponseFactory();
this.httpProcessor = getHttpProcessor();
this.connStrategy = new DefaultConnectionReuseStrategy();
@@ -137,10 +137,10 @@ public class ServerHandler implements NH
this.activeConnections = new ArrayList<NHttpServerConnection>();
this.latencyView = new LatencyView(isHttps);
this.threadingView = new ThreadingView("HttpServerWorker", true, 50);
- this.restDispatching = restDispatching;
+ this.restDispatching = listenerContext.isRestDispatching();
this.cfg = NHttpConfiguration.getInstance();
- if (executor == null) {
+ if (listenerContext.getExecutor() == null) {
this.workerPool = WorkerPoolFactory.getWorkerPool(
cfg.getServerCoreThreads(),
cfg.getServerMaxThreads(),
@@ -148,11 +148,11 @@ public class ServerHandler implements NH
cfg.getServerQueueLen(),
"Server Worker thread group", "HttpServerWorker");
} else {
- this.executor = executor;
- this.parser = parser;
+ this.executor = listenerContext.getExecutor();
+ this.parser = listenerContext.getParser();
}
- this.httpGetRequestProcessor = httpGetRequestProcessor;
+ this.httpGetRequestProcessor =
listenerContext.getHttpGetRequestProcessor();
}
/**
@@ -209,8 +209,8 @@ public class ServerHandler implements NH
metrics.incrementMessagesReceived();
}
// hand off processing of the request to a thread off the pool
- ServerWorker worker = new ServerWorker(cfgCtx, conn, isHttps,
metrics, this,
- request, is, response, os, restDispatching,
httpGetRequestProcessor);
+ ServerWorker worker = new ServerWorker(listenerContext, conn, this,
+ request, is, response, os);
if (workerPool != null) {
workerPool.execute(worker);
Modified:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerWorker.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerWorker.java?rev=1072963&r1=1072962&r2=1072963&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerWorker.java
(original)
+++
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerWorker.java
Mon Feb 21 13:23:54 2011
@@ -95,38 +95,33 @@ public class ServerWorker implements Run
* its output. This however does not force the processor to write a
response back as the
* traditional servlet service() method, but creates the background
required to write the
* response, if one would be created.
- * @param cfgCtx the Axis2 configuration context
+ *
+ * @param listenerContext the listener configuration
* @param conn the underlying http connection
- * @param isHttps whether https or not
- * @param metrics metrics for the transport
* @param serverHandler the handler of the server side messages
* @param request the http request received (might still be in the process
of being streamed)
* @param is the stream input stream to read the request body
* @param response the response to be populated if applicable
* @param os the output stream to write the response body if one is
applicable
- * @param isRestDispatching weather we should dispatch in case of rest
*/
- public ServerWorker(final ConfigurationContext cfgCtx, final
NHttpServerConnection conn,
- final boolean isHttps,
- final MetricsCollector metrics,
+ public ServerWorker(ListenerContext listenerContext,
+ final NHttpServerConnection conn,
final ServerHandler serverHandler,
final HttpRequest request, final InputStream is,
- final HttpResponse response, final OutputStream os,
- final boolean isRestDispatching,
- final HttpGetRequestProcessor httpGetRequestProcessor) {
+ final HttpResponse response, final OutputStream os) {
- this.cfgCtx = cfgCtx;
+ this.cfgCtx = listenerContext.getCfgCtx();
this.conn = conn;
- this.isHttps = isHttps;
- this.metrics = metrics;
+ this.isHttps = listenerContext.isSsl();
+ this.metrics = listenerContext.getMetrics();
this.serverHandler = serverHandler;
this.request = request;
this.response = response;
this.is = is;
this.os = os;
this.msgContext = createMessageContext(request);
- this.isRestDispatching = isRestDispatching;
- this.httpGetRequestProcessor = httpGetRequestProcessor;
+ this.isRestDispatching = listenerContext.isRestDispatching();
+ this.httpGetRequestProcessor =
listenerContext.getHttpGetRequestProcessor();
}
/**