Oleg, Thank you for quick response. Unbounded thread pool is not acceptable for me, but there is also no need to process an arbitrary number of concurrent requests. It is ok for me to not accept new connections until some worker become idle. So, threads count in the pool can be limited by number of concurrent connections, without having any restrictions in the thread pool itself.
So, my first idea is to override DefaultListeningIOReactor.addChannel() which will block if current connections count is above than some config-defined value until some existing connection close. Having maximum connections count set to the half of the desired maximum workers count I'll avoid any blocking inside request handler. As far as I understand it, channels have one-to-one correspondence with connections, please, correct me, if it the are not. It seems to me that this will work fine. Please, let me know if I missed something. I have two other thoughts concerning thread count limitation. 1. Implement IOReactor same as AbstractMultiworkerIOReactor but with limited thread pool instead of «Thread[] threads;». So, AbstractMultiworkerIOReactor.addChannel() will try to put new Worker(channel, dispatcher, eventDispatch), which will serve single channel, or block, if there is no idle threads. In this case, HttpAsyncRequestHandler<HttpRequest>.handle() will never be called concurrently with corresponding HttpAsyncResponseProducer.produceContent(). Just because they will be called in series. 2. Implement IOReactor same as AbstractMultiworkerIOReactor but with worker threads splitted into four separate groups, one for each possible operation from SelectionKey. So, HttpAsyncRequestHandler<HttpRequest>.handle() blocked on putting task in thread pool won't prevent worker threads from OP_WRITE group from executing their task. Corresponding channels will be moved from one workers group to another basing on some event, like HttpAsyncExchange.submitResponse(). Please, let me know, if any of these two approaches has significant advantages over DefaultListeningIOReactor.addChannel() overloading. -- Dmitry On Wed, Sep 26, 2012 at 12:05 AM, Oleg Kalnichevski <[email protected]> wrote: > On Tue, 2012-09-25 at 20:50 +0400, Dmitry Potapov wrote: >> Hello everyone, >> >> I'm trying to implement HttpAsyncResponseProducer using >> SharedOutputBuffer and found that I'm unable to control amount of >> concurrent response producers. Most probably, this problem comes from >> my incorrect understanding of HttpAsyncResponseProducer design. So, >> let me explain, how I'm solving this problem now and what problems I >> expect if I will use response producer. >> >> Currently, in HttpAsyncRequestHandler<HttpRequest>.handle(), I create >> worker object, which will write content to PipedOutputStream, and push >> it into executor (mine implementation of Executor.execute() will block >> if there is no idle worker threads to process passed task). After >> Executor.execute() returns I call >> HttpAsyncExchange.submitResponse(BasicAsyncResponseProducer(response)) >> where response contains InputStreamEntity, which handles corresponding >> PipedInputStream. >> That's it, amount of concurrent HttpAsyncContentProducer objects is >> limited by executor queue. >> >> In case of HttpAsyncResponseProducer I'm using the following logic: >> 1. In HttpAsyncRequestHandler<HttpRequest>.handle() I'm submitting my >> implementation of HttpAsyncResponseProducer to >> HttpAsyncExchange.submitResponse(). >> 2. In HttpAsyncResponseProducer.produceContent() checks if it is a >> first call, and if so creates SharedOutputBuffer using passed >> IOControl and, using created buffer, creates new worker object and >> pass it to executor. >> 3. If this wasn't a first call to >> HttpAsyncResponseProducer.produceContent() then I simply call >> SharedOutputBuffer.produceContent() >> >> The problem is that all IOReactor worker threads can stuck in >> Executor.execute() and Executor worker threads will stuck on >> SharedOutputBuffer.write(), because all IOReactor worker threads >> waiting for their completion. >> >> There is another approach which can help avoid problem described >> above. Executor.execute() can reject task (by throwing an exception) >> if there is no idle workers. In case of exception >> HttpAsyncResponseProducer.produceContent() will do nothing and will >> try to create SharedOutputBuffer and task on the next >> HttpAsyncResponseProducer.produceContent() invocation. But in this >> case amount of response producers can grow limitless and cause >> OutOfMemoryException, so, this approach is not acceptable, too. >> >> Can somebody explain, which is a proper way to implement asynchronous >> handler, which content is being generated by separate thread? >> Unfortunately, I was unable to find any hints on this in source code. >> > > Dmitry, > > Whenever one ends up mixing transport code based on a non-blocking i/o > and processing code based on inherently blocking InputStream / > OutputStream API things are bound to get very ugly. > > Generally one must not invoke any potentially blocking code from either > request handler, request consumer or response producer (or basically any > routine executed by an I/O dispatch thread) or risk blocking all > connections managed by that thread. Therefore making a call > Executor#execute() from a I/O dispatch thread is a very risky > proposition unless the executor is backed by an unbounded thread pool. > > My recommendation would be to use an unbounded thread pool for > simplicity, but to make sure it shrinks automatically when load gets > lighter. If you are absolutely sure you want to use a fixed number of > worker threads to process an arbitrary of number of concurrent requests > you need to queue HttpAsyncExchange instances and employ an additional > thread to distribute them across a bounded pool of worker threads. To > sum it all up, whatever you do you want to make sure it is done without > blocking on a mutex longer than absolutely necessary. > > You can have a look at the CXF async client HTTP екфтызщке as an example > [1]. It is client side but the general approach should be applicable to > the server side as well. > > Hope this helps somewhat. I'll happily deal with any follow-up questions > tomorrow. > > Олег > > [1] > http://svn.apache.org/repos/asf/cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/ > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: [email protected] > For additional commands, e-mail: [email protected] > --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
