Author: asankha
Date: Fri Nov 27 09:59:57 2009
New Revision: 884809
URL: http://svn.apache.org/viewvc?rev=884809&view=rev
Log:
fix SYNAPSE-600
Modified:
synapse/branches/1.3/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
synapse/branches/1.3/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
Modified:
synapse/branches/1.3/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
URL:
http://svn.apache.org/viewvc/synapse/branches/1.3/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java?rev=884809&r1=884808&r2=884809&view=diff
==============================================================================
---
synapse/branches/1.3/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
(original)
+++
synapse/branches/1.3/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
Fri Nov 27 09:59:57 2009
@@ -40,7 +40,6 @@
import org.apache.http.nio.params.NIOReactorPNames;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.IOReactorExceptionHandler;
-import org.apache.http.nio.reactor.ListenerEndpoint;
import org.apache.http.params.BasicHttpParams;
import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpParams;
@@ -49,6 +48,7 @@
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.io.InterruptedIOException;
+import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
@@ -242,83 +242,103 @@
if (log.isDebugEnabled()) {
log.debug("Starting Listener...");
}
-
- // configure the IO reactor on the specified port
+ // start the Listener in a new seperate thread
+ Thread t = new Thread(new Runnable() {
+ public void run() {
+ try {
+ startServerEngine();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }, "HttpCoreNIOListener");
+
+ t.start();
+ log.info((sslContext == null ? "HTTP" : "HTTPS") + " Listener starting
on" +
+ (bindAddress != null ? " address : " + bindAddress : "") + " port
: " + port);
+ }
+
+ /**
+ * configure and start the IO reactor on the specified port
+ */
+ private void startServerEngine() {
HttpParams params = getServerParameters();
try {
String prefix = (sslContext == null ? "http" : "https") +
"-Listener I/O dispatcher";
ioReactor = new DefaultListeningIOReactor(
- NHttpConfiguration.getInstance().getServerIOWorkers(),
+ NHttpConfiguration.getInstance().getServerIOWorkers(),
new NativeThreadFactory(new ThreadGroup(prefix + " thread
group"), prefix), params);
ioReactor.setExceptionHandler(new IOReactorExceptionHandler() {
public boolean handle(IOException ioException) {
- log.warn("System may be unstable: IOReactor encountered a
checked exception : "
- + ioException.getMessage(), ioException);
- return true;
+ log.warn("System may be unstable: IOReactor encountered a
checked exception : " +
+ ioException.getMessage(), ioException);
+ if (ioException instanceof BindException) {
+ // bind failures considered OK to ignore
+ return true;
+ }
+ return false;
}
public boolean handle(RuntimeException runtimeException) {
- log.warn("System may be unstable: IOReactor encountered a
runtime exception : "
- + runtimeException.getMessage(), runtimeException);
- return true;
+ log.warn("System may be unstable: IOReactor encountered a
runtime exception : " +
+ runtimeException.getMessage(), runtimeException);
+ if (runtimeException instanceof
UnsupportedOperationException) {
+ // Unsupported operations considered OK to ignore
+ return true;
+ }
+ return false;
}
});
} catch (IOException e) {
- handleException("Error starting the IOReactor", e);
+ log.error("Error starting the IOReactor", e);
}
- for (Object obj :
cfgCtx.getAxisConfiguration().getServices().values()) {
- addToServiceURIMap((AxisService) obj);
- }
-
handler = new ServerHandler(cfgCtx, params, sslContext != null,
metrics);
- final IOEventDispatch ioEventDispatch = getEventDispatch(handler,
- sslContext, sslIOSessionHandler, params);
+ IOEventDispatch ioEventDispatch = getEventDispatch(
+ handler, sslContext, sslIOSessionHandler, params);
+
state = BaseConstants.STARTED;
-
- ListenerEndpoint endpoint;
+ boolean attemptAutoRestart = true;
try {
if (bindAddress == null) {
- endpoint = ioReactor.listen(new InetSocketAddress(port));
+ ioReactor.listen(new InetSocketAddress(port));
} else {
- endpoint = ioReactor.listen(new InetSocketAddress(
+ ioReactor.listen(new InetSocketAddress(
InetAddress.getByName(bindAddress), port));
}
+ ioReactor.execute(ioEventDispatch);
+ attemptAutoRestart = false;
+
+ } catch (InterruptedIOException ex) {
+ log.fatal("Reactor Interrupted");
} catch (IOException e) {
- handleException("Encountered an I/O error: " + e.getMessage(), e);
- return;
+ log.fatal("Encountered an I/O error: " + e.getMessage(), e);
+ } catch (Exception e) {
+ log.fatal("Encountered a Runtime exception : " + e.getMessage(),
e);
}
-
- // start the IO reactor in a new separate thread
- Thread t = new Thread(new Runnable() {
- public void run() {
- try {
- ioReactor.execute(ioEventDispatch);
- } catch (InterruptedIOException ex) {
- log.fatal("Reactor Interrupted");
- } catch (IOException e) {
- log.fatal("Encountered an I/O error: " + e.getMessage(),
e);
- } catch (Exception e) {
- log.fatal("Unexpected exception in I/O reactor", e);
+
+ if (attemptAutoRestart) {
+ log.info("IOReactor encountered a fatal exception. Attempting a
re-start of the reactor");
+ try {
+ log.info("Attempt shutdown of the IOReactor if still
running..");
+ ioReactor.shutdown();
+ } catch (IOException ignore) {}
+ handler.stop();
+
+ log.info("Restarting IOReactor.. ");
+ Thread t = new Thread(new Runnable() {
+ public void run() {
+ startServerEngine();
}
- log.info((sslContext == null ? "HTTP" : "HTTPS") + " Listener
Shutdown");
- }
- }, "HttpCoreNIOListener");
+ }, "HttpCoreNIOListener");
+ t.start();
+ log.info((sslContext == null ? "HTTP" : "HTTPS") + " Listener auto
re-starting");
- t.start();
-
- // Wait for the endpoint to become ready, i.e. for the listener to
start accepting
- // requests.
- try {
- endpoint.waitFor();
- } catch (InterruptedException e) {
- log.warn("HttpCoreNIOListener#start() was interrupted");
+ } else {
+ log.info((sslContext == null ? "HTTP" : "HTTPS") + " Listener
Shutdown");
}
-
- log.info((sslContext == null ? "HTTP" : "HTTPS") + " Listener started
on" +
- (bindAddress != null ? " address : " + bindAddress : "") + " port
: " + port);
- }
+ }
private void addToServiceURIMap(AxisService service) {
Parameter param =
service.getParameter(NhttpConstants.SERVICE_URI_LOCATION);
Modified:
synapse/branches/1.3/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
URL:
http://svn.apache.org/viewvc/synapse/branches/1.3/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java?rev=884809&r1=884808&r2=884809&view=diff
==============================================================================
---
synapse/branches/1.3/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
(original)
+++
synapse/branches/1.3/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
Fri Nov 27 09:59:57 2009
@@ -65,11 +65,13 @@
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
+import java.net.BindException;
import java.util.*;
/**
* NIO transport sender for Axis2 based on HttpCore and NIO extensions
*/
+...@suppresswarnings({"JavadocReference"})
public class HttpCoreNIOSender extends AbstractHandler implements
TransportSender, ManagementSupport {
private static final Log log = LogFactory.getLog(HttpCoreNIOSender.class);
@@ -109,7 +111,7 @@
* @param transportOut the description of the http/s transport from Axis2
configuration
* @throws AxisFault thrown on an error
*/
- public void init(ConfigurationContext cfgCtx, TransportOutDescription
transportOut) throws AxisFault {
+ public void init(ConfigurationContext cfgCtx, final
TransportOutDescription transportOut) throws AxisFault {
this.cfgCtx = cfgCtx;
// is this an SSL Sender?
@@ -151,44 +153,10 @@
cfgCtx.setNonReplicableProperty("warnOnHTTP500", warnOnHttp500);
}
- HttpParams params = getClientParameters();
- try {
- String prefix = (sslContext == null ? "http" : "https") + "-Sender
I/O dispatcher";
- ioReactor = new DefaultConnectingIOReactor(
- NHttpConfiguration.getInstance().getClientIOWorkers(),
- new NativeThreadFactory(new ThreadGroup(prefix + " thread
group"), prefix), params);
- ioReactor.setExceptionHandler(new IOReactorExceptionHandler() {
- public boolean handle(IOException ioException) {
- log.warn("System may be unstable: IOReactor encountered a
checked exception : " +
- ioException.getMessage(), ioException);
- return true;
- }
-
- public boolean handle(RuntimeException runtimeException) {
- log.warn("System may be unstable: IOReactor encountered a
runtime exception : " +
- runtimeException.getMessage(), runtimeException);
- return true;
- }
- });
- } catch (IOException e) {
- log.error("Error starting the IOReactor", e);
- }
-
- handler = new ClientHandler(cfgCtx, params, metrics);
- final IOEventDispatch ioEventDispatch = getEventDispatch(
- handler, sslContext, sslIOSessionHandler, params, transportOut);
-
// start the Sender in a new seperate thread
Thread t = new Thread(new Runnable() {
public void run() {
- try {
- ioReactor.execute(ioEventDispatch);
- } catch (InterruptedIOException ex) {
- log.fatal("Reactor Interrupted");
- } catch (IOException e) {
- log.fatal("Encountered an I/O error: " + e.getMessage(),
e);
- }
- log.info((sslContext == null ? "HTTP" : "HTTPS") + " Sender
Shutdown");
+ executeClientEngine(transportOut);
}
}, "HttpCoreNIOSender");
t.start();
@@ -521,6 +489,86 @@
}
/**
+ * Configure and start the IOReactor
+ */
+ private void executeClientEngine(final TransportOutDescription
transportOut) {
+
+ HttpParams params = getClientParameters();
+ try {
+ String prefix = (sslContext == null ? "http" : "https") + "-Sender
I/O dispatcher";
+ ioReactor = new DefaultConnectingIOReactor(
+ NHttpConfiguration.getInstance().getClientIOWorkers(),
+ new NativeThreadFactory(new ThreadGroup(prefix + " thread
group"), prefix), params);
+ ioReactor.setExceptionHandler(new IOReactorExceptionHandler() {
+ public boolean handle(IOException ioException) {
+ log.warn("System may be unstable: IOReactor encountered a
checked exception : " +
+ ioException.getMessage(), ioException);
+ if (ioException instanceof BindException) {
+ // bind failures considered OK to ignore
+ return true;
+ }
+ return false;
+ }
+
+ public boolean handle(RuntimeException runtimeException) {
+ log.warn("System may be unstable: IOReactor encountered a
runtime exception : " +
+ runtimeException.getMessage(), runtimeException);
+ if (runtimeException instanceof
UnsupportedOperationException) {
+ // Unsupported operations considered OK to ignore
+ return true;
+ }
+ return false;
+ }
+ });
+ } catch (IOException e) {
+ log.error("Error starting the IOReactor", e);
+ }
+
+ handler = new ClientHandler(cfgCtx, params, metrics);
+ IOEventDispatch ioEventDispatch = null;
+ try {
+ ioEventDispatch = getEventDispatch(
+ handler, sslContext, sslIOSessionHandler, params, transportOut);
+ } catch (AxisFault axisFault) {
+ log.fatal("Error getting event dispatch");
+ }
+
+ state = BaseConstants.STARTED;
+ boolean attemptAutoRestart = true;
+ try {
+ ioReactor.execute(ioEventDispatch);
+ attemptAutoRestart = false;
+ } catch (InterruptedIOException ex) {
+ log.fatal("Reactor Interrupted");
+ } catch (IOException e) {
+ log.fatal("Encountered an I/O error: " + e.getMessage(), e);
+ } catch (Exception e) {
+ log.fatal("Encountered an error: " + e.getMessage(), e);
+ }
+
+ if (attemptAutoRestart) {
+ log.info("IOReactor encountered a fatal exception. Attempting a
re-start of the reactor");
+ try {
+ log.info("Attempt shutdown of the IOReactor if still
running..");
+ ioReactor.shutdown();
+ } catch (IOException ignore) {}
+ handler.stop();
+
+ log.info("Restarting IOReactor.. ");
+ Thread t = new Thread(new Runnable() {
+ public void run() {
+ executeClientEngine(transportOut);
+ }
+ }, "HttpCoreNIOSender");
+ t.start();
+ log.info((sslContext == null ? "HTTP" : "HTTPS") + " Sender auto
re-starting");
+
+ } else {
+ log.info((sslContext == null ? "HTTP" : "HTTPS") + " Sender
Shutdown");
+ }
+ }
+
+ /**
* Determine the HttpStatusCodedepending on the message type processed <br>
* (normal response versus fault response) as well as Axis2 message
context properties set
* via Synapse configuration or MessageBuilders.