On Mon, 2019-09-09 at 11:45 -0700, Roy Hashimoto wrote: > I have observed it clearing the bit. I have seen the blocking read > here > < > https://github.com/apache/httpcomponents-core/blob/9a1b26e06e35220fb349de2e8c4197c1ac87dcf9/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java#L271 > > > happen when the Selector interestOps is 0, so the Selector does not > seem to > be gating the read. >
Read / write operations on IOSession are non-blocking and never block. Oleg > Roy > > On Mon, Sep 9, 2019 at 11:37 AM Oleg Kalnichevski <ol...@apache.org> > wrote: > > > On Mon, 2019-09-09 at 11:01 -0700, Roy Hashimoto wrote: > > > If this is a real issue, I believe it happens with > > > AbstractHttp1StreamDuplexer when consuming entities on both > > > server > > > and client. Here is a sample server program to investigate the > > > behavior. It would probably be easier to test with a client > > > program > > > but I've been using the server API so that was quicker for me to > > > write. > > > > > > The sample is pretty minimal. All I've done is set the HTTP1 > > > initial > > > window size to 8192 and add an AsyncExchangeHandler that creates > > > an > > > empty response and logs the AsyncDataConsumer method > > > implementations > > > to the console. Note that I intentionally never update the > > > CapacityChannel - I have effectively made an infinitely slow > > > consumer. > > > > > > To test, post data of more than 8 KB to the server (e.g. using > > > curl). > > > I expect to see in the log consume() calls totaling at least 8192 > > > and > > > nothing after that until the socket times out. Instead, what I > > > see is > > > consume() calls delivering all the data and then streamEnd(). > > > > > > Here's sample output for uploading almost 32 KB. I have used my > > > IDE > > > to add a logging breakpoint to show the capacity window on this > > > line, > > > so those are also included in the output below (in green if > > > you're > > > viewing HTML mail). > > > > > > Mon Sep 09, 2019 10:27:18.893 AM SlowConsumerTest main FINE: > > > Listening on /0:0:0:0:0:0:0:0:8080 > > > Mon Sep 09, 2019 10:27:31.294 AM > > > SlowConsumerTest$MyExchangeHandler > > > handleRequest FINEST: handleRequest called > > > Mon Sep 09, 2019 10:27:31.332 AM > > > SlowConsumerTest$MyExchangeHandler > > > consume FINEST: consume 8192 total 8192 > > > AbstractHttp1StreamDuplexer:313 capacity=0 > > > Mon Sep 09, 2019 10:27:31.341 AM > > > SlowConsumerTest$MyExchangeHandler > > > updateCapacity FINEST: updateCapacity called > > > Mon Sep 09, 2019 10:27:31.342 AM > > > SlowConsumerTest$MyExchangeHandler > > > consume FINEST: consume 8192 total 16384 > > > AbstractHttp1StreamDuplexer:313 capacity=-8192 > > > Mon Sep 09, 2019 10:27:31.346 AM > > > SlowConsumerTest$MyExchangeHandler > > > updateCapacity FINEST: updateCapacity called > > > Mon Sep 09, 2019 10:27:31.348 AM > > > SlowConsumerTest$MyExchangeHandler > > > consume FINEST: consume 8192 total 24576 > > > AbstractHttp1StreamDuplexer:313 capacity=-16384 > > > Mon Sep 09, 2019 10:27:31.352 AM > > > SlowConsumerTest$MyExchangeHandler > > > updateCapacity FINEST: updateCapacity called > > > Mon Sep 09, 2019 10:27:31.354 AM > > > SlowConsumerTest$MyExchangeHandler > > > consume FINEST: consume 8150 total 32726 > > > AbstractHttp1StreamDuplexer:313 capacity=-24534 > > > Mon Sep 09, 2019 10:27:31.357 AM > > > SlowConsumerTest$MyExchangeHandler > > > streamEnd FINEST: streamEnd called > > > > > > My expectation is that there would no calls to consume() once > > > capacity reached 0 and we would never see streamEnd(). > > > > > > > Roy > > > > My expectations would be the same. The protocol handler should be > > clearing read interest if the capacity window drops below zero, > > see: > > > > > > https://github.com/apache/httpcomponents-core/blob/master/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java#L609 > > > > I will investigate why it does not happen. > > > > Oleg > > > > > Roy > > > > > > On Mon, Sep 9, 2019 at 8:10 AM Oleg Kalnichevski < > > > ol...@apache.org> > > > wrote: > > > > On Mon, 2019-09-09 at 07:58 -0700, Roy Hashimoto wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > client.start(Http1Config.custom().setBufferSize(256).setInitial > > > > Wind > > > > > > owSize(32).build()); > > > > > > > > > > > > > > > > > > > This change alone will have little effect without adjusting > > > > > > the > > > > > > session > > > > > > buffer side as well. Given that the default session buffer > > > > > > size > > > > > > > > is > > > > > > 8192 > > > > > > with 32 byte initial capacity window one is likely to get > > > > > > > > multiple > > > > > > #consume invocations with negative capacity. > > > > > > > > > > > > > > > Hmm. Isn't setBufferSize(256) for the session buffer? Also, > > > > > if I > > > > > increase > > > > > the amount of request data in the test to more than 8 KB: > > > > > > > > > > return new MultiLineResponseHandler("0123456789abcd", > > > > > 1000); > > > > > > > > > > Then I can see the capacity window go far more negative than > > > > > > > > -8192, > > > > > which I > > > > > wouldn't expect with your explanation. > > > > > > > > > > It looks like AbstractHttp1StreamDuplexer reads its data with > > > > > a > > > > > blocking > > > > > read without explicitly waiting on the NIO Selector > > > > > configured by > > > > > CapacityChannel (unless I missed it in the code). Does this > > > > > type > > > > > > > > of > > > > > read > > > > > implicitly use the Selector? It doesn't appear to me that it > > > > > > > > does. > > > > > For > > > > > example, I can reach this line > > > > > < > > > > > > > > > https://github.com/apache/httpcomponents-core/blob/9a1b26e06e35220fb349de2e8c4197c1ac87dcf9/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java#L272 > > > > > > > > > > > > > > > > right > > > > > after the blocking read() when > > > > > session.ioSession.key.interestOps > > > > > > > > is > > > > > 0. > > > > > > > > > > Don't waste time on this if I'm not making sense. I'm not > > > > > that > > > > > familiar > > > > > with the NIO API, and I don't have a use case that depends on > > > > > > > > this. > > > > > Just > > > > > wanted to bring it to someone's attention in case it > > > > > indicated an > > > > > issue. > > > > > > > > > > Roy > > > > > > > > Roy > > > > > > > > I am not sure we are talking about the same things because it > > > > do > > > > not > > > > even understand if you are talking about the server side, > > > > client > > > > side > > > > or both. Can you put together a test application that > > > > reproduces > > > > the > > > > issue and describe what think is wrong? > > > > > > > > Oleg > > > > > > > > > > > > > > On Mon, Sep 9, 2019 at 2:02 AM Oleg Kalnichevski < > > > > > > > > ol...@apache.org> > > > > > wrote: > > > > > > > > > > > On Sun, 2019-09-08 at 17:07 -0700, Roy Hashimoto wrote: > > > > > > > By looking at the suggested 5.0 examples I was able to > > > > > > > get an > > > > > > > AsyncServerExchangeHandler subclass to play nicely with > > > > > > > > Kotlin > > > > > > > coroutines > > > > > > > on the AsyncDataProducer side of things, i.e. minimizing > > > > > > > produce() > > > > > > > polling > > > > > > > and avoiding buffering. > > > > > > > > > > > > > > I haven't been as successful with throttling calls on the > > > > > > > AsyncDataConsumer > > > > > > > side, i.e. consume() calls keep being made even though > > > > > > > the > > > > > > > capacity > > > > > > > window > > > > > > > has gone negative. I think this might be the expected > > > > > > > > behavior > > > > > > > because of > > > > > > > this comment in AbstractHttp1StreamDuplexer: > > > > > > > > > > > > > > // At present the consumer can be forced to > > > > > > > consume > > > > > > > > data > > > > > > > // over its declared capacity in order to avoid > > > > > > > > having > > > > > > > // unprocessed message body content stuck in the > > > > > > > > session > > > > > > > // input buffer > > > > > > > > > > > > > > Does that refer to just the case where the capacity > > > > > > > starts > > > > > > > positive > > > > > > > but > > > > > > > data exceeding the capacity is delivered to consume()? Or > > > > > > > > does it > > > > > > > refer to > > > > > > > the behavior I see, which is that capacity updates (or > > > > > > > the > > > > > > > > lack > > > > > > > of > > > > > > > them) > > > > > > > don't seem to have any effect for HTTP/1.1? > > > > > > > > > > > > > > > > > > > It is the former. Whatever data read from the underlying > > > > > > socket > > > > > > channel > > > > > > into the session buffer will be force-fed into the consumer > > > > > > regardless > > > > > > of its declared capacity for HTTP/1.1 connections. > > > > > > > > > > > > > I've also tried running the > > > > > > > Http1IntegrationTest.testSlowResponseConsumer() > > > > > > > test, substituting this line to trigger updateCapacity() > > > > > > > > calls: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > client.start(Http1Config.custom().setBufferSize(256).setInitial > > > > Wi > > > > > > > ndow > > > > > > > Size(32).build()); > > > > > > > > > > > > > > > > > > > This change alone will have little effect without adjusting > > > > > > the > > > > > > session > > > > > > buffer side as well. Given that the default session buffer > > > > > > size > > > > > > > > is > > > > > > 8192 > > > > > > with 32 byte initial capacity window one is likely to get > > > > > > > > multiple > > > > > > #consume invocations with negative capacity. > > > > > > > > > > > > > > > > > > > By adding the small initial window, I can see the > > > > > > > > capacityWindow > > > > > > > going more > > > > > > > and more negative on each consume() call with all the > > > > > > > data > > > > > > > buffered > > > > > > > before > > > > > > > the test code completes its first sleep. > > > > > > > > > > > > > > I don't have a specific use case for a slow consumer, > > > > > > > just > > > > > > > > want > > > > > > > to > > > > > > > know if > > > > > > > I'm misunderstanding something. > > > > > > > > > > > > > > > > > > > I realized that we likely need to adjust the session buffer > > > > > > > > size > > > > > > automatically when the initial window setting is below than > > > > > > > > value. > > > > > > > > > > > > Hope this helps > > > > > > > > > > > > Oleg > > > > > > > > > > > > > Thanks! > > > > > > > Roy > > > > > > > > > > > > > > On Fri, Sep 6, 2019 at 10:15 AM Roy Hashimoto < > > > > > > > roy.hashim...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > Those are good leads, I'll pursue them. > > > > > > > > > > > > > > > > Thanks! > > > > > > > > Roy > > > > > > > > > > > > > > > > On Fri, Sep 6, 2019 at 9:57 AM Oleg Kalnichevski < > > > > > > > > ol...@apache.org> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > On Fri, 2019-09-06 at 09:43 -0700, Ryan Schmitt > > > > > > > > > wrote: > > > > > > > > > > Have you looked at the reactive extensions for > > > > > > > > HttpCore5? > > > > > > > > > > They > > > > > > > > > > demonstrate > > > > > > > > > > how to implement > > > > > > > > > > AsyncEntityProducer/AsyncDataProducer > > > > > > > > with > > > > > > > > > > support > > > > > > > > > > for > > > > > > > > > > backpressure (or you can just use the Reactive > > > > > > > > > > Streams > > > > > > > > API > > > > > > > > > > instead): > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/httpcomponents-core/tree/master/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Just a bit of background. In 5.0 one can no longer > > > > > > > > > assume > > > > > > > > > that > > > > > > > > > one > > > > > > > > > message exchange has exclusive ownership of the > > > > > > > > underlying > > > > > > > > > connection. > > > > > > > > > Multiplexed message exchanges in HTTP/2 and > > > > > > > > > piplelined > > > > > > > > > message > > > > > > > > > exchanges in HTTP/1.1 must not block other concurrent > > > > > > > > > exchanges. > > > > > > > > > Message changes however can update their current > > > > > > > > > capacity > > > > > > > > via > > > > > > > > > `CapacityChannel`. Reactive extensions is a great > > > > > > > > > example > > > > > > > > and > > > > > > > > > also an > > > > > > > > > alternative to the native APIs per Ryan's > > > > > > > > > recommendation. > > > > > > > > > > > > > > > > > > If you prefer the native APIs you can take a look at > > > > > > > > > the > > > > > > > > > classic > > > > > > > > > I/O > > > > > > > > > adaptors that essentially emulate the classic > > > > > > > > > blocking > > > > > > > > i/o on > > > > > > > > > top > > > > > > > > > of > > > > > > > > > the new async APIs [1] or HTTP/1.1 integration tests > > > > > > > > > [2] > > > > > > > > that > > > > > > > > > have a > > > > > > > > > number of 'slow' consumer / producer test cases. > > > > > > > > > > > > > > > > > > Cheers > > > > > > > > > > > > > > > > > > Oleg > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/httpcomponents-core/tree/master/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic > > > > > > > > > [2] > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/httpcomponents-core/blob/master/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Sep 6, 2019 at 9:33 AM Roy Hashimoto < > > > > > > > > > > roy.hashim...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > I'm playing with asynchronous handlers in > > > > > > > > > > > HttpCore 5, > > > > > > > > and > > > > > > > > > > > I'd > > > > > > > > > > > like > > > > > > > > > > > to have > > > > > > > > > > > an AsyncEntityProducer write data at its own > > > > > > > > > > > (slow) > > > > > > > > rate > > > > > > > > > > > like > > > > > > > > > > > in > > > > > > > > > > > this old > > > > > > > > > > > thread < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://marc.info/?l=httpclient-commons-dev&m=134928851229305&w=2 > > > > > > > > > > > > . > > > > > > > > > > > > > > > > > > > > > > Writing to the DataStreamChannel whenever I want > > > > > > > > > > > - > > > > > > > > > > > outside > > > > > > > > > > > the > > > > > > > > > > > scope of a > > > > > > > > > > > produce() method call - works fine, but I notice > > > > > > > > > > > that > > > > > > > > > > > produce() is > > > > > > > > > > > being > > > > > > > > > > > called every 5-6 milliseconds which ideally I > > > > > > > > > > > would > > > > > > > > like > > > > > > > > > > > to > > > > > > > > > > > eliminate or > > > > > > > > > > > reduce. > > > > > > > > > > > > > > > > > > > > > > The answer in the old thread was to use > > > > > > > > > > > IOControl.suspendOutput() > > > > > > > > > > > and > > > > > > > > > > > IOControl.requestOutput(), but this class appears > > > > > > > > > > > no > > > > > > > > > > > longer > > > > > > > > > > > to be > > > > > > > > > > > in > > > > > > > > > > > HttpCore 5. I see that there is a > > > > > > > > > > > DataStreamChannel.requestOutput() > > > > > > > > > > > but I > > > > > > > > > > > haven't figured out what suspension call that > > > > > > > > > > > should > > > > > > > > be > > > > > > > > > > > paired > > > > > > > > > > > with. I have > > > > > > > > > > > tried simply returning 0 from my > > > > > > > > > > > AsyncEntityProducer.available() > > > > > > > > > > > override, > > > > > > > > > > > but that doesn't seem to be it. > > > > > > > > > > > > > > > > > > > > > > Is there a new way to suspend/resume output in > > > > > > > > HttpCore > > > > > > > > > > > 5? > > > > > > > > > > > > > > > > > > > > > > Thanks! > > > > > > > > > > > Roy > > > > > > > > > > > > > > > > > > > > > > Kotlin source here > > > > > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://gist.github.com/rhashimoto/1f5501d3b5d2aa95251fe12f4f0be250 > > > > > > > > > > > > . > > > > > > > > > > > > > > > > > > > > > > > > > > > --------------------------------------------------- > > > > > > > > > ---- > > > > > > > > ---- > > > > > > > > > ---- > > > > > > > > > ------ > > > > > > > > > To unsubscribe, e-mail: dev-unsubscr...@hc.apache.org > > > > > > > > > For additional commands, e-mail: > > > > > > > > > dev-h...@hc.apache.org > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > --------------------------------------------------------- > > > > > > ---- > > > > > > > > ---- > > > > > > ---- > > > > > > To unsubscribe, e-mail: dev-unsubscr...@hc.apache.org > > > > > > For additional commands, e-mail: dev-h...@hc.apache.org > > > > > > > > > > > > > > > > > > > > > > > > ------------------------------------------------------------- > > > > ---- > > > > ---- > > > > To unsubscribe, e-mail: dev-unsubscr...@hc.apache.org > > > > For additional commands, e-mail: dev-h...@hc.apache.org > > > > > > > > > > --------------------------------------------------------------- > > > ------ > > > To unsubscribe, e-mail: dev-unsubscr...@hc.apache.org > > > For additional commands, e-mail: dev-h...@hc.apache.org > > > > > > ----------------------------------------------------------------- > > ---- > > To unsubscribe, e-mail: dev-unsubscr...@hc.apache.org > > For additional commands, e-mail: dev-h...@hc.apache.org > > > > --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@hc.apache.org For additional commands, e-mail: dev-h...@hc.apache.org