Author: remm
Date: Wed May 17 05:55:39 2006
New Revision: 407241
URL: http://svn.apache.org/viewcvs?rev=407241&view=rev
Log:
- Start work on comet support. Note: it doesn't work yet, I think (I didn't
test), and most of this
is very preliminary. It is relatively straightforward, though.
Added:
tomcat/tc6.0.x/trunk/java/org/apache/catalina/CometProcessor.java (with
props)
tomcat/tc6.0.x/trunk/java/org/apache/catalina/servlets/CometServlet.java
(with props)
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
tomcat/tc6.0.x/trunk/java/org/apache/catalina/connector/Request.java
tomcat/tc6.0.x/trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProcessor.java
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
Added: tomcat/tc6.0.x/trunk/java/org/apache/catalina/CometProcessor.java
URL:
http://svn.apache.org/viewcvs/tomcat/tc6.0.x/trunk/java/org/apache/catalina/CometProcessor.java?rev=407241&view=auto
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/CometProcessor.java (added)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/CometProcessor.java Wed May
17 05:55:39 2006
@@ -0,0 +1,21 @@
+package org.apache.catalina;
+
+import java.io.IOException;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+public interface CometProcessor {
+
+ public void begin(HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException;
+ public void end(HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException;
+
+ public void error(HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException;
+ public void read(HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException;
+
+}
Propchange: tomcat/tc6.0.x/trunk/java/org/apache/catalina/CometProcessor.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
URL:
http://svn.apache.org/viewcvs/tomcat/tc6.0.x/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java?rev=407241&r1=407240&r2=407241&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
Wed May 17 05:55:39 2006
@@ -19,6 +19,7 @@
import java.io.IOException;
+import org.apache.catalina.CometProcessor;
import org.apache.catalina.Context;
import org.apache.catalina.Globals;
import org.apache.catalina.Wrapper;
@@ -135,10 +136,35 @@
}
+ // Comet processing
+ if (request.getWrapper() != null
+ && request.getWrapper() instanceof CometProcessor) {
+ try {
+ if (request.getAttribute("org.apache.tomcat.comet.error") !=
null) {
+ ((CometProcessor)
request.getWrapper()).error(request.getRequest(), response.getResponse());
+ } else {
+ ((CometProcessor)
request.getWrapper()).read(request.getRequest(), response.getResponse());
+ }
+ } catch (IOException e) {
+ ;
+ } catch (Throwable t) {
+ log.error(sm.getString("coyoteAdapter.service"), t);
+ } finally {
+ // Recycle the wrapper request and response
+ if (request.getAttribute("org.apache.tomcat.comet") == null) {
+ request.recycle();
+ response.recycle();
+ }
+ }
+ return;
+ }
+
if (connector.getXpoweredBy()) {
response.addHeader("X-Powered-By", "Servlet/2.5");
}
+ boolean comet = false;
+
try {
// Parse and set Catalina and configuration specific
@@ -148,8 +174,16 @@
connector.getContainer().getPipeline().getFirst().invoke(request, response);
}
- response.finishResponse();
- req.action( ActionCode.ACTION_POST_REQUEST , null);
+ if (request.getAttribute("org.apache.tomcat.comet.support") ==
Boolean.TRUE
+ && request.getWrapper() instanceof CometProcessor) {
+ request.setAttribute("org.apache.tomcat.comet", Boolean.TRUE);
+ comet = true;
+ }
+
+ if (!comet) {
+ response.finishResponse();
+ req.action( ActionCode.ACTION_POST_REQUEST , null);
+ }
} catch (IOException e) {
;
@@ -157,8 +191,10 @@
log.error(sm.getString("coyoteAdapter.service"), t);
} finally {
// Recycle the wrapper request and response
- request.recycle();
- response.recycle();
+ if (!comet) {
+ request.recycle();
+ response.recycle();
+ }
}
}
Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/connector/Request.java
URL:
http://svn.apache.org/viewcvs/tomcat/tc6.0.x/trunk/java/org/apache/catalina/connector/Request.java?rev=407241&r1=407240&r2=407241&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/connector/Request.java
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/connector/Request.java Wed
May 17 05:55:39 2006
@@ -1294,6 +1294,12 @@
if (readOnlyAttributes.containsKey(name)) {
return;
}
+
+ // Pass special attributes to the native layer
+ if (name.startsWith("org.apache.tomcat.")) {
+ coyoteRequest.getAttributes().remove(name);
+ }
+
found = attributes.containsKey(name);
if (found) {
value = attributes.get(name);
@@ -1301,7 +1307,7 @@
} else {
return;
}
-
+
// Notify interested application event listeners
Object listeners[] = context.getApplicationEventListeners();
if ((listeners == null) || (listeners.length == 0))
Added: tomcat/tc6.0.x/trunk/java/org/apache/catalina/servlets/CometServlet.java
URL:
http://svn.apache.org/viewcvs/tomcat/tc6.0.x/trunk/java/org/apache/catalina/servlets/CometServlet.java?rev=407241&view=auto
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/servlets/CometServlet.java
(added)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/servlets/CometServlet.java
Wed May 17 05:55:39 2006
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.catalina.servlets;
+
+
+import java.io.IOException;
+
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.catalina.CometProcessor;
+
+
+/**
+ * Helper class to implement Comet functionality.
+ */
+public abstract class CometServlet
+ extends HttpServlet implements CometProcessor {
+
+ public void begin(HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+
+ }
+
+ public void end(HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+ request.removeAttribute("org.apache.tomcat.comet");
+ }
+
+ public void error(HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+ end(request, response);
+ }
+
+ public abstract void read(HttpServletRequest request, HttpServletResponse
response)
+ throws IOException, ServletException;
+
+ protected void service(HttpServletRequest request, HttpServletResponse
response)
+ throws IOException, ServletException {
+
+ if (request.getAttribute("org.apache.tomcat.comet.support") ==
Boolean.TRUE) {
+ begin(request, response);
+ } else {
+ // FIXME: Implement without comet support
+ begin(request, response);
+
+ // Loop reading data
+
+ end(request, response);
+ }
+
+ }
+
+}
Propchange:
tomcat/tc6.0.x/trunk/java/org/apache/catalina/servlets/CometServlet.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java
URL:
http://svn.apache.org/viewcvs/tomcat/tc6.0.x/trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java?rev=407241&r1=407240&r2=407241&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java Wed May
17 05:55:39 2006
@@ -35,6 +35,7 @@
import org.apache.tomcat.util.modeler.Registry;
import org.apache.tomcat.util.net.AprEndpoint;
import org.apache.tomcat.util.net.AprEndpoint.Handler;
+import org.apache.tomcat.util.net.AprEndpoint.Handler.SocketState;
import org.apache.tomcat.util.res.StringManager;
@@ -429,7 +430,12 @@
this.proto = proto;
}
- public boolean process(long socket) {
+ // FIXME: Support for this could be added in AJP as well
+ public SocketState event(long socket, boolean error) {
+ return SocketState.CLOSED;
+ }
+
+ public SocketState process(long socket) {
AjpAprProcessor processor = null;
try {
processor = (AjpAprProcessor) localProcessor.get();
@@ -460,7 +466,11 @@
((ActionHook) processor).action(ActionCode.ACTION_START,
null);
}
- return processor.process(socket);
+ if (processor.process(socket)) {
+ return SocketState.OPEN;
+ } else {
+ return SocketState.CLOSED;
+ }
} catch(java.net.SocketException e) {
// SocketExceptions are normal
@@ -487,7 +497,7 @@
((ActionHook) processor).action(ActionCode.ACTION_STOP,
null);
}
}
- return false;
+ return SocketState.CLOSED;
}
}
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProcessor.java
URL:
http://svn.apache.org/viewcvs/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProcessor.java?rev=407241&r1=407240&r2=407241&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProcessor.java
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProcessor.java
Wed May 17 05:55:39 2006
@@ -52,6 +52,7 @@
import org.apache.tomcat.util.http.FastHttpDateFormat;
import org.apache.tomcat.util.http.MimeHeaders;
import org.apache.tomcat.util.net.AprEndpoint;
+import org.apache.tomcat.util.net.AprEndpoint.Handler.SocketState;
import org.apache.tomcat.util.res.StringManager;
@@ -147,12 +148,6 @@
/**
- * State flag.
- */
- protected boolean started = false;
-
-
- /**
* Error flag.
*/
protected boolean error = false;
@@ -183,6 +178,12 @@
/**
+ * Comet used.
+ */
+ protected boolean comet = false;
+
+
+ /**
* Content delimitator for the request (if false, the connection will
* be closed at the end of the request).
*/
@@ -735,7 +736,53 @@
*
* @throws IOException error during an I/O operation
*/
- public boolean process(long socket)
+ public SocketState event(boolean error)
+ throws IOException {
+
+ RequestInfo rp = request.getRequestProcessor();
+
+ try {
+ rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
+ if (error) {
+ request.setAttribute("org.apache.tomcat.comet.error",
Boolean.TRUE);
+ }
+ // FIXME: It is also possible to add a new "event" method in the
adapter
+ // or something similar
+ adapter.service(request, response);
+ if (request.getAttribute("org.apache.tomcat.comet") == null) {
+ comet = false;
+ endpoint.getCometPoller().remove(socket);
+ }
+ } catch (InterruptedIOException e) {
+ error = true;
+ } catch (Throwable t) {
+ log.error(sm.getString("http11processor.request.process"), t);
+ // 500 - Internal Server Error
+ response.setStatus(500);
+ error = true;
+ }
+
+ rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
+
+ if (error) {
+ recycle();
+ return SocketState.CLOSED;
+ } else if (!comet) {
+ recycle();
+ endpoint.getPoller().add(socket);
+ return SocketState.OPEN;
+ } else {
+ return SocketState.LONG;
+ }
+ }
+
+ /**
+ * Process pipelined HTTP requests using the specified input and output
+ * streams.
+ *
+ * @throws IOException error during an I/O operation
+ */
+ public SocketState process(long socket)
throws IOException {
RequestInfo rp = request.getRequestProcessor();
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
@@ -768,7 +815,7 @@
boolean keptAlive = false;
boolean openSocket = false;
- while (started && !error && keepAlive) {
+ while (!error && keepAlive) {
// Parsing the request header
try {
@@ -833,7 +880,10 @@
error = response.getErrorException() != null ||
statusDropsConnection(response.getStatus());
}
-
+ // Comet support
+ if (request.getAttribute("org.apache.tomcat.comet") !=
null) {
+ comet = true;
+ }
} catch (InterruptedIOException e) {
error = true;
} catch (Throwable t) {
@@ -845,25 +895,8 @@
}
// Finish the handling of the request
- try {
- rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);
- inputBuffer.endRequest();
- } catch (IOException e) {
- error = true;
- } catch (Throwable t) {
- log.error(sm.getString("http11processor.request.finish"), t);
- // 500 - Internal Server Error
- response.setStatus(500);
- error = true;
- }
- try {
- rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);
- outputBuffer.endRequest();
- } catch (IOException e) {
- error = true;
- } catch (Throwable t) {
- log.error(sm.getString("http11processor.response.finish"), t);
- error = true;
+ if (!comet) {
+ endRequest();
}
// If there was an error, make sure the request is counted as
@@ -873,17 +906,8 @@
}
request.updateCounters();
- rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);
-
- // Don't reset the param - we'll see it as ended. Next request
- // will reset it
- // thrA.setParam(null);
- // Next request
- inputBuffer.nextRequest();
- outputBuffer.nextRequest();
-
// Do sendfile as needed: add socket to sendfile and end
- if (sendfileData != null) {
+ if (sendfileData != null && !error) {
sendfileData.socket = socket;
sendfileData.keepAlive = keepAlive;
if (!endpoint.getSendfile().add(sendfileData)) {
@@ -892,19 +916,63 @@
}
}
+ rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);
+
}
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
- // Recycle
+ if (comet) {
+ if (error) {
+ recycle();
+ return SocketState.CLOSED;
+ } else {
+ endpoint.getCometPoller().add(socket);
+ return SocketState.LONG;
+ }
+ } else {
+ recycle();
+ return (openSocket) ? SocketState.OPEN : SocketState.CLOSED;
+ }
+
+ }
+
+
+ public void endRequest() {
+
+ // Finish the handling of the request
+ try {
+ inputBuffer.endRequest();
+ } catch (IOException e) {
+ error = true;
+ } catch (Throwable t) {
+ log.error(sm.getString("http11processor.request.finish"), t);
+ // 500 - Internal Server Error
+ response.setStatus(500);
+ error = true;
+ }
+ try {
+ outputBuffer.endRequest();
+ } catch (IOException e) {
+ error = true;
+ } catch (Throwable t) {
+ log.error(sm.getString("http11processor.response.finish"), t);
+ error = true;
+ }
+
+ // Next request
+ inputBuffer.nextRequest();
+ outputBuffer.nextRequest();
+
+ }
+
+
+ public void recycle() {
inputBuffer.recycle();
outputBuffer.recycle();
this.socket = 0;
-
- return openSocket;
-
}
-
+
// ----------------------------------------------------- ActionHook Methods
@@ -966,6 +1034,7 @@
// End the processing of the current request, and stop any further
// transactions with the client
+ comet = false;
try {
outputBuffer.endRequest();
} catch (IOException e) {
@@ -985,14 +1054,6 @@
// Do nothing
- } else if (actionCode == ActionCode.ACTION_START) {
-
- started = true;
-
- } else if (actionCode == ActionCode.ACTION_STOP) {
-
- started = false;
-
} else if (actionCode == ActionCode.ACTION_REQ_HOST_ADDR_ATTRIBUTE) {
// Get remote host address
@@ -1368,6 +1429,8 @@
if (endpoint.getUseSendfile()) {
request.setAttribute("org.apache.tomcat.sendfile.support",
Boolean.TRUE);
}
+ // Advertise comet support through a request attribute
+ request.setAttribute("org.apache.tomcat.comet.support", Boolean.TRUE);
}
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java
URL:
http://svn.apache.org/viewcvs/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java?rev=407241&r1=407240&r2=407241&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java
Wed May 17 05:55:39 2006
@@ -20,6 +20,7 @@
import java.net.URLEncoder;
import java.util.Hashtable;
import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import javax.management.MBeanRegistration;
@@ -598,20 +599,73 @@
// -------------------- Connection handler --------------------
static class Http11ConnectionHandler implements Handler {
- Http11AprProtocol proto;
- static int count=0;
- RequestGroupInfo global=new RequestGroupInfo();
- ThreadLocal localProcessor = new ThreadLocal();
+
+ protected Http11AprProtocol proto;
+ protected static int count = 0;
+ protected RequestGroupInfo global = new RequestGroupInfo();
+
+ protected ThreadLocal<Http11AprProcessor> localProcessor =
+ new ThreadLocal<Http11AprProcessor>();
+ protected ConcurrentHashMap<Long, Http11AprProcessor> connections =
+ new ConcurrentHashMap<Long, Http11AprProcessor>();
+ protected java.util.Stack<Http11AprProcessor> recycledProcessors =
+ new java.util.Stack<Http11AprProcessor>();
- Http11ConnectionHandler( Http11AprProtocol proto ) {
- this.proto=proto;
+ Http11ConnectionHandler(Http11AprProtocol proto) {
+ this.proto = proto;
}
- public boolean process(long socket) {
+ public SocketState event(long socket, boolean error) {
+ Http11AprProcessor result = connections.get(socket);
+ SocketState state = SocketState.CLOSED;
+ if (result != null) {
+ boolean recycle = error;
+ // Call the appropriate event
+ try {
+ state = result.event(error);
+ } catch (java.net.SocketException e) {
+ // SocketExceptions are normal
+ Http11AprProtocol.log.debug
+ (sm.getString
+ ("http11protocol.proto.socketexception.debug"), e);
+ } catch (java.io.IOException e) {
+ // IOExceptions are normal
+ Http11AprProtocol.log.debug
+ (sm.getString
+ ("http11protocol.proto.ioexception.debug"), e);
+ }
+ // Future developers: if you discover any other
+ // rare-but-nonfatal exceptions, catch them here, and log as
+ // above.
+ catch (Throwable e) {
+ // any other exception or error is odd. Here we log it
+ // with "ERROR" level, so it will show up even on
+ // less-than-verbose logs.
+ Http11AprProtocol.log.error
+ (sm.getString("http11protocol.proto.error"), e);
+ } finally {
+ if (state != SocketState.LONG) {
+ connections.remove(socket);
+ recycledProcessors.push(result);
+ }
+ }
+ }
+ return state;
+ }
+
+ public SocketState process(long socket) {
Http11AprProcessor processor = null;
try {
processor = (Http11AprProcessor) localProcessor.get();
if (processor == null) {
+ synchronized (recycledProcessors) {
+ if (!recycledProcessors.isEmpty()) {
+ processor = recycledProcessors.pop();
+ localProcessor.set(processor);
+ }
+ }
+ }
+ if (processor == null) {
processor =
new Http11AprProcessor(proto.maxHttpHeaderSize,
proto.ep);
processor.setAdapter(proto.adapter);
@@ -647,7 +701,15 @@
((ActionHook) processor).action(ActionCode.ACTION_START,
null);
}
- return processor.process(socket);
+ SocketState state = processor.process(socket);
+ if (state == SocketState.LONG) {
+ // Associate the connection with the processor. The next
request
+ // processed by this thread will use either a new or a
recycled
+ // processor.
+ connections.put(socket, processor);
+ localProcessor.set(null);
+ }
+ return state;
} catch(java.net.SocketException e) {
// SocketExceptions are normal
@@ -669,15 +731,8 @@
// less-than-verbose logs.
Http11AprProtocol.log.error
(sm.getString("http11protocol.proto.error"), e);
- } finally {
- // if(proto.adapter != null) proto.adapter.recycle();
- // processor.recycle();
-
- if (processor instanceof ActionHook) {
- ((ActionHook) processor).action(ActionCode.ACTION_STOP,
null);
- }
}
- return false;
+ return SocketState.CLOSED;
}
}
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java
URL:
http://svn.apache.org/viewcvs/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java?rev=407241&r1=407240&r2=407241&view=diff
==============================================================================
---
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java
(original)
+++
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java
Wed May 17 05:55:39 2006
@@ -329,8 +329,7 @@
* consumed. This method only resets all the pointers so that we are ready
* to parse the next HTTP request.
*/
- public void nextRequest()
- throws IOException {
+ public void nextRequest() {
// Recycle Request object
request.recycle();
Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
URL:
http://svn.apache.org/viewcvs/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=407241&r1=407240&r2=407241&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Wed
May 17 05:55:39 2006
@@ -300,6 +300,14 @@
/**
+ * Allow comet request handling.
+ */
+ protected boolean useComet = true;
+ public void setUseComet(boolean useComet) { this.useComet = useComet; }
+ public boolean getUseComet() { return useComet; }
+
+
+ /**
* Acceptor thread count.
*/
protected int acceptorThreadCount = 0;
@@ -335,6 +343,17 @@
/**
+ * The socket poller used for Comet support.
+ */
+ protected Poller[] cometPollers = null;
+ protected int cometPollerRoundRobin = 0;
+ public Poller getCometPoller() {
+ cometPollerRoundRobin = (cometPollerRoundRobin + 1) %
cometPollers.length;
+ return cometPollers[cometPollerRoundRobin];
+ }
+
+
+ /**
* The static file sender.
*/
protected Sendfile[] sendfiles = null;
@@ -561,11 +580,8 @@
addressStr = address.getHostAddress();
}
int family = Socket.APR_INET;
- if (Library.APR_HAVE_IPV6) {
- if (addressStr == null)
- family = Socket.APR_UNSPEC;
- else if (addressStr.indexOf(':') >= 0)
- family = Socket.APR_UNSPEC;
+ if (Library.APR_HAVE_IPV6 && (addressStr == null ||
addressStr.indexOf(':') >= 0)) {
+ family = Socket.APR_UNSPEC;
}
long inetAddress = Address.info(addressStr, family,
port, 0, rootPool);
@@ -712,7 +728,7 @@
// Start poller threads
pollers = new Poller[pollerThreadCount];
for (int i = 0; i < pollerThreadCount; i++) {
- pollers[i] = new Poller();
+ pollers[i] = new Poller(false);
pollers[i].init();
Thread pollerThread = new Thread(pollers[i], getName() +
"-Poller-" + i);
pollerThread.setPriority(threadPriority);
@@ -720,6 +736,17 @@
pollerThread.start();
}
+ // Start comet poller threads
+ cometPollers = new Poller[pollerThreadCount];
+ for (int i = 0; i < pollerThreadCount; i++) {
+ cometPollers[i] = new Poller(true);
+ cometPollers[i].init();
+ Thread pollerThread = new Thread(cometPollers[i], getName() +
"-CometPoller-" + i);
+ pollerThread.setPriority(threadPriority);
+ pollerThread.setDaemon(true);
+ pollerThread.start();
+ }
+
// Start sendfile threads
if (useSendfile) {
sendfiles = new Sendfile[sendfileThreadCount];
@@ -998,6 +1025,26 @@
}
+ /**
+ * Process given socket for an event.
+ */
+ protected boolean processSocket(long socket, boolean error) {
+ try {
+ if (executor == null) {
+ getWorkerThread().assign(socket, error);
+ } else {
+ executor.execute(new SocketEventProcessor(socket, error));
+ }
+ } catch (Throwable t) {
+ // This means we got an OOM or similar creating a thread, or that
+ // the pool and its queue are full
+ log.error(sm.getString("endpoint.process.fail"), t);
+ return false;
+ }
+ return true;
+ }
+
+
// --------------------------------------------------- Acceptor Inner Class
@@ -1060,10 +1107,18 @@
protected long[] addS;
protected int addCount = 0;
+ protected long[] removeS;
+ protected int removeCount = 0;
+ protected boolean comet = true;
+
protected int keepAliveCount = 0;
public int getKeepAliveCount() { return keepAliveCount; }
+ public Poller(boolean comet) {
+ this.comet = comet;
+ }
+
/**
* Create the poller. With some versions of APR, the maximum poller
size will
* be 62 (reocmpiling APR is necessary to remove this limitation).
@@ -1071,19 +1126,29 @@
protected void init() {
pool = Pool.create(serverSockPool);
int size = pollerSize / pollerThreadCount;
- serverPollset = allocatePoller(size, pool, soTimeout);
+ int timeout = soTimeout;
+ if (comet) {
+ // FIXME: Find an appropriate timeout value, for now, "longer
than usual"
+ // semms appropriate
+ timeout = soTimeout * 20;
+ }
+ serverPollset = allocatePoller(size, pool, timeout);
if (serverPollset == 0 && size > 1024) {
size = 1024;
- serverPollset = allocatePoller(size, pool, soTimeout);
+ serverPollset = allocatePoller(size, pool, timeout);
}
if (serverPollset == 0) {
size = 62;
- serverPollset = allocatePoller(size, pool, soTimeout);
+ serverPollset = allocatePoller(size, pool, timeout);
}
desc = new long[size * 2];
keepAliveCount = 0;
addS = new long[size];
addCount = 0;
+ if (comet) {
+ removeS = new long[size];
+ }
+ removeCount = 0;
}
/**
@@ -1092,18 +1157,32 @@
protected void destroy() {
// Close all sockets in the add queue
for (int i = 0; i < addCount; i++) {
+ if (comet) {
+ processSocket(addS[i], true);
+ }
Socket.destroy(addS[i]);
}
+ // Close all sockets in the remove queue
+ for (int i = 0; i < removeCount; i++) {
+ if (comet) {
+ processSocket(removeS[i], true);
+ }
+ Socket.destroy(removeS[i]);
+ }
// Close all sockets still in the poller
int rv = Poll.pollset(serverPollset, desc);
if (rv > 0) {
for (int n = 0; n < rv; n++) {
+ if (comet) {
+ processSocket(desc[n*2+1], true);
+ }
Socket.destroy(desc[n*2+1]);
}
}
Pool.destroy(pool);
keepAliveCount = 0;
addCount = 0;
+ removeCount = 0;
}
/**
@@ -1120,6 +1199,9 @@
// at most for pollTime before being polled
if (addCount >= addS.length) {
// Can't do anything: close the socket right away
+ if (comet) {
+ processSocket(socket, true);
+ }
Socket.destroy(socket);
return;
}
@@ -1130,6 +1212,30 @@
}
/**
+ * Remove specified socket and associated pool from the poller. The
socket will
+ * be added to a temporary array, and polled first after a maximum
amount
+ * of time equal to pollTime (in most cases, latency will be much
lower,
+ * however). Note that this is automatic, except if the poller is used
for
+ * comet.
+ *
+ * @param socket to remove from the poller
+ */
+ public void remove(long socket) {
+ synchronized (this) {
+ // Add socket to the list. Newly added sockets will wait
+ // at most for pollTime before being polled
+ if (removeCount >= removeS.length) {
+ // Normally, it cannot happen ...
+ Socket.destroy(socket);
+ return;
+ }
+ removeS[removeCount] = socket;
+ removeCount++;
+ this.notify();
+ }
+ }
+
+ /**
* The background thread that listens for incoming TCP/IP connections
and
* hands them off to an appropriate processor.
*/
@@ -1171,23 +1277,41 @@
keepAliveCount++;
} else {
// Can't do anything: close the socket
right away
+ if (comet) {
+ processSocket(addS[i], true);
+ }
Socket.destroy(addS[i]);
}
}
addCount = 0;
}
}
+ // Remove sockets which are waiting to the poller
+ if (removeCount > 0) {
+ synchronized (this) {
+ for (int i = 0; i < removeCount; i++) {
+ int rv = Poll.remove(serverPollset,
removeS[i]);
+ }
+ removeCount = 0;
+ }
+ }
+
maintainTime += pollTime;
// Pool for the specified interval
- int rv = Poll.poll(serverPollset, pollTime, desc, true);
+ int rv = Poll.poll(serverPollset, pollTime, desc, !comet);
if (rv > 0) {
keepAliveCount -= rv;
for (int n = 0; n < rv; n++) {
// Check for failed sockets and hand this socket
off to a worker
if (((desc[n*2] & Poll.APR_POLLHUP) ==
Poll.APR_POLLHUP)
|| ((desc[n*2] & Poll.APR_POLLERR) ==
Poll.APR_POLLERR)
+ || (comet && (!processSocket(desc[n*2+1],
false)))
|| (!processSocket(desc[n*2+1]))) {
// Close socket and clear pool
+ if (comet) {
+ processSocket(desc[n*2+1], true);
+ Poll.remove(serverPollset, desc[n*2+1]);
+ }
Socket.destroy(desc[n*2+1]);
continue;
}
@@ -1215,6 +1339,11 @@
keepAliveCount -= rv;
for (int n = 0; n < rv; n++) {
// Close socket and clear pool
+ if (comet) {
+ // FIXME: should really close in case of
timeout ?
+ // FIXME: maybe comet should use an
extended timeout
+ processSocket(desc[n], true);
+ }
Socket.destroy(desc[n]);
}
}
@@ -1242,6 +1371,8 @@
protected Thread thread = null;
protected boolean available = false;
protected long socket = 0;
+ protected boolean event = false;
+ protected boolean error = false;
/**
@@ -1265,6 +1396,28 @@
// Store the newly available Socket and notify our thread
this.socket = socket;
+ event = false;
+ error = false;
+ available = true;
+ notifyAll();
+
+ }
+
+
+ protected synchronized void assign(long socket, boolean error) {
+
+ // Wait for the Processor to get the previous Socket
+ while (available) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ }
+ }
+
+ // Store the newly available Socket and notify our thread
+ this.socket = socket;
+ event = true;
+ this.error = error;
available = true;
notifyAll();
@@ -1310,7 +1463,11 @@
continue;
// Process the request from this socket
- if (!handler.process(socket)) {
+ if ((event) && (handler.event(socket, error) ==
Handler.SocketState.CLOSED)) {
+ // Close socket and pool
+ Socket.destroy(socket);
+ socket = 0;
+ } else if (handler.process(socket) ==
Handler.SocketState.CLOSED) {
// Close socket and pool
Socket.destroy(socket);
socket = 0;
@@ -1622,7 +1779,11 @@
* thread local fields.
*/
public interface Handler {
- public boolean process(long socket);
+ public enum SocketState {
+ OPEN, CLOSED, LONG
+ }
+ public SocketState process(long socket);
+ public SocketState event(long socket, boolean error);
}
@@ -1700,7 +1861,38 @@
public void run() {
// Process the request from this socket
- if (!handler.process(socket)) {
+ if (handler.process(socket) == Handler.SocketState.CLOSED) {
+ // Close socket and pool
+ Socket.destroy(socket);
+ socket = 0;
+ }
+
+ }
+
+ }
+
+
+ // --------------------------------------- SocketEventProcessor Inner Class
+
+
+ /**
+ * This class is the equivalent of the Worker, but will simply use in an
+ * external Executor thread pool.
+ */
+ protected class SocketEventProcessor implements Runnable {
+
+ protected long socket = 0;
+ protected boolean error = false;
+
+ public SocketEventProcessor(long socket, boolean error) {
+ this.socket = socket;
+ this.error = error;
+ }
+
+ public void run() {
+
+ // Process the request from this socket
+ if (handler.event(socket, error) == Handler.SocketState.CLOSED) {
// Close socket and pool
Socket.destroy(socket);
socket = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]