Folks,
I've recently been investigating support for asynchronous response delivery
using Restlets for a usecase such as Comet (a.k.a. Ajax Push, HTTP
Streaming, etc) where the representation is not entirely available when the
response is started, and additional parts become available over time, until
the response finally completes.
For example, the observed behavior at the client would be:
o Request sent to server
o Response stream begins
o ...time passes...
o Response stream continues
o ...time passes...
o ...etc...
o ...time passes...
o Response stream completes
In order to support this scenario in a scalable way, it is important not to
pin a Thread during each "...time passes...", although pinning a Thread is
still functionally correct.
Note that new data becomes available at the server, it is most likely
produced by a completely independent Thread, such as a server side
integration with an external data feed, like stock market information, or
else the data may be generated by another end user, such as sending a
message to a group chat room.
Considering how this style of data notification would map to Restlets, I
would like to trigger some discussion of how we might proceed to support
this scenario. In an effort to stimulate this discussion, here is a
proposal that is currently working for my locally patched org.restlet,
com.noelios.restlet, com.noelios.restlet.ext.simple and
com.noelios.restlet.ext.jetty. :-)
void Response.suspend(long timeout)
long Response.getTimeout()
void Response.resume(boolean complete)
An implementer of Restlet.handle(Request, Response) would not only set the
response Representation, but also call response.suspend(timeout) to indicate
the nature of the response. If a request is not suspended, then it behaves
the same as today. When the Server.handle(Request, Response) unwinds, then
the HttpServerCall has a chance to process the suspended request in a number
of different ways.
o pin the Thread via Object.wait(timeout)
o suspend via Jetty Continuation (throws special exception to free Thread,
resumes later)
o support a secondary Thread writing directly to the Response stream (eg.
Grizzly?)
The main point here is that by capturing the desired semantics of suspend
and resume, there is plenty of freedom for integrations to choose their best
available implementation strategy.
Later, after "...time passes...", when more data becomes available, the
response is resumed, causing the Representation to be written to the
response once more. This seems compatible with the concept of
Representations being "consumed" during the response - more data has become
available, so the Representation needs to be further consumed.
Below is a trivial example of the code for a Restlet that generates a pulse
of messages with gaps in between over a lengthy response. Normally it would
not make sense to create a new Thread during Restlet.handle(), but it helps
to clearly communicate the desired behavior for the purposes of this
example.
Note that using (locally patched) com.noelios.restlet.ext.simple, the Thread
is pinned between message delivery, whereas using (locally patched)
com.noelios.restlet.ext.jetty, the same Restlet code allows the Thread to be
released between message delivery and then re-acquired when the response is
resumed.
I would like to get some feedback from the Restlet gurus on this to see if
we can find a way to integrate some form of asynchronous request processing
in the near future. If it would be helpful to the discussion, I would be
happy to upload my local patches to the issue tracker.
Sample code usage now follows:
public void handle(Request request, Response response)
{
final MessageHandler handler = new MessageHandler(response);
BlockingQueueRepresentation entity = new BlockingQueueRepresentation(
MediaType.TEXT_HTML, handler.getQueue());
response.setEntity(entity);
response.suspend(60L * 1000L); // suspend for 60seconds maximum
timeout
new Thread("Message-Pulse") {
public void run()
{
// start the message stream
handler.getQueue().add(new
StringRepresentation("<html><body><h1>Testing...</h1>"));
handler.getResponse().resume(false);
try
{
// send 10 messages, one every 3 seconds
for (int i=0; i < 10; i++)
{
sleep(3000L);
handler.getQueue().add(new
StringRepresentation(String.format("<div>%d
%TT</div>", i, new Date())));
handler.getResponse().resume(false);
}
}
catch (InterruptedException e)
{
e.printStackTrace();
}
finally
{
// complete the response, roughly 30 seconds after response
initiated, therefore before 60 seconds timeout
handler.getQueue().add(new
StringRepresentation("</body></html>"));
handler.getResponse().resume(true);
}
}
}.start();
}
public class MessageHandler
{
public MessageHandler(Response response)
{
_response = response;
_queue = new LinkedBlockingQueue<Representation>();
}
public BlockingQueue<Representation> getQueue()
{
return _queue;
}
public Response getResponse()
{
return _response;
}
private final Response _response;
private final BlockingQueue<Representation> _queue;
}
public class BlockingQueueRepresentation extends OutputRepresentation
{
public BlockingQueueRepresentation(MediaType mediaType,
BlockingQueue<Representation> representations)
{
super(mediaType);
_representations = representations;
}
@Override
public void write(OutputStream outputStream) throws IOException
{
List<Representation> representations = new
LinkedList<Representation>();
_representations.drainTo(representations);
if (!representations.isEmpty())
{
for (Representation representation : representations)
{
representation.write(outputStream);
}
}
}
private final BlockingQueue<Representation> _representations;
}
Kind Regards,
John Fallows