If this is a real issue, I believe it happens with AbstractHttp1StreamDuplexer when consuming entities on both server and client. Here is a sample server program <https://gist.github.com/rhashimoto/76e4177446f1bb554659d733ef16b9be> to investigate the behavior. It would probably be easier to test with a client program but I've been using the server API so that was quicker for me to write.
The sample is pretty minimal. All I've done is set the HTTP1 initial window size to 8192 and add an AsyncExchangeHandler that creates an empty response and logs the AsyncDataConsumer method implementations to the console. Note that *I intentionally never update the CapacityChannel* - I have effectively made an infinitely slow consumer. To test, post data of more than 8 KB to the server (e.g. using curl). I expect to see in the log consume() calls totaling at least 8192 and nothing after that until the socket times out. Instead, what I see is consume() calls delivering all the data and then streamEnd(). Here's sample output for uploading almost 32 KB. I have used my IDE to add a logging breakpoint to show the capacity window on this line <https://github.com/apache/httpcomponents-core/blob/a60528ea58877d55dab266bd2813e065aac6ff2c/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java#L313>, so those are also included in the output below (in green if you're viewing HTML mail). Mon Sep 09, 2019 10:27:18.893 AM SlowConsumerTest main FINE: Listening on /0:0:0:0:0:0:0:0:8080 Mon Sep 09, 2019 10:27:31.294 AM SlowConsumerTest$MyExchangeHandler handleRequest FINEST: handleRequest called Mon Sep 09, 2019 10:27:31.332 AM SlowConsumerTest$MyExchangeHandler consume FINEST: consume 8192 total 8192 AbstractHttp1StreamDuplexer:313 capacity=0 Mon Sep 09, 2019 10:27:31.341 AM SlowConsumerTest$MyExchangeHandler updateCapacity FINEST: updateCapacity called Mon Sep 09, 2019 10:27:31.342 AM SlowConsumerTest$MyExchangeHandler consume FINEST: consume 8192 total 16384 AbstractHttp1StreamDuplexer:313 capacity=-8192 Mon Sep 09, 2019 10:27:31.346 AM SlowConsumerTest$MyExchangeHandler updateCapacity FINEST: updateCapacity called Mon Sep 09, 2019 10:27:31.348 AM SlowConsumerTest$MyExchangeHandler consume FINEST: consume 8192 total 24576 AbstractHttp1StreamDuplexer:313 capacity=-16384 Mon Sep 09, 2019 10:27:31.352 AM SlowConsumerTest$MyExchangeHandler updateCapacity FINEST: updateCapacity called Mon Sep 09, 2019 10:27:31.354 AM SlowConsumerTest$MyExchangeHandler consume FINEST: consume 8150 total 32726 AbstractHttp1StreamDuplexer:313 capacity=-24534 Mon Sep 09, 2019 10:27:31.357 AM SlowConsumerTest$MyExchangeHandler streamEnd FINEST: streamEnd called My expectation is that there would no calls to consume() once capacity reached 0 and we would never see streamEnd(). Roy On Mon, Sep 9, 2019 at 8:10 AM Oleg Kalnichevski <ol...@apache.org> wrote: > On Mon, 2019-09-09 at 07:58 -0700, Roy Hashimoto wrote: > > > > > > > > > > > > > client.start(Http1Config.custom().setBufferSize(256).setInitialWind > > > owSize(32).build()); > > > > > > > > > > This change alone will have little effect without adjusting the > > > session > > > buffer side as well. Given that the default session buffer size is > > > 8192 > > > with 32 byte initial capacity window one is likely to get multiple > > > #consume invocations with negative capacity. > > > > > > Hmm. Isn't setBufferSize(256) for the session buffer? Also, if I > > increase > > the amount of request data in the test to more than 8 KB: > > > > return new MultiLineResponseHandler("0123456789abcd", 1000); > > > > Then I can see the capacity window go far more negative than -8192, > > which I > > wouldn't expect with your explanation. > > > > It looks like AbstractHttp1StreamDuplexer reads its data with a > > blocking > > read without explicitly waiting on the NIO Selector configured by > > CapacityChannel (unless I missed it in the code). Does this type of > > read > > implicitly use the Selector? It doesn't appear to me that it does. > > For > > example, I can reach this line > > < > > > https://github.com/apache/httpcomponents-core/blob/9a1b26e06e35220fb349de2e8c4197c1ac87dcf9/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java#L272 > > > > > right > > after the blocking read() when session.ioSession.key.interestOps is > > 0. > > > > Don't waste time on this if I'm not making sense. I'm not that > > familiar > > with the NIO API, and I don't have a use case that depends on this. > > Just > > wanted to bring it to someone's attention in case it indicated an > > issue. > > > > Roy > > Roy > > I am not sure we are talking about the same things because it do not > even understand if you are talking about the server side, client side > or both. Can you put together a test application that reproduces the > issue and describe what think is wrong? > > Oleg > > > > > On Mon, Sep 9, 2019 at 2:02 AM Oleg Kalnichevski <ol...@apache.org> > > wrote: > > > > > On Sun, 2019-09-08 at 17:07 -0700, Roy Hashimoto wrote: > > > > By looking at the suggested 5.0 examples I was able to get an > > > > AsyncServerExchangeHandler subclass to play nicely with Kotlin > > > > coroutines > > > > on the AsyncDataProducer side of things, i.e. minimizing > > > > produce() > > > > polling > > > > and avoiding buffering. > > > > > > > > I haven't been as successful with throttling calls on the > > > > AsyncDataConsumer > > > > side, i.e. consume() calls keep being made even though the > > > > capacity > > > > window > > > > has gone negative. I think this might be the expected behavior > > > > because of > > > > this comment in AbstractHttp1StreamDuplexer: > > > > > > > > // At present the consumer can be forced to consume data > > > > // over its declared capacity in order to avoid having > > > > // unprocessed message body content stuck in the session > > > > // input buffer > > > > > > > > Does that refer to just the case where the capacity starts > > > > positive > > > > but > > > > data exceeding the capacity is delivered to consume()? Or does it > > > > refer to > > > > the behavior I see, which is that capacity updates (or the lack > > > > of > > > > them) > > > > don't seem to have any effect for HTTP/1.1? > > > > > > > > > > It is the former. Whatever data read from the underlying socket > > > channel > > > into the session buffer will be force-fed into the consumer > > > regardless > > > of its declared capacity for HTTP/1.1 connections. > > > > > > > I've also tried running the > > > > Http1IntegrationTest.testSlowResponseConsumer() > > > > test, substituting this line to trigger updateCapacity() calls: > > > > > > > > > > > > client.start(Http1Config.custom().setBufferSize(256).setInitialWi > > > > ndow > > > > Size(32).build()); > > > > > > > > > > This change alone will have little effect without adjusting the > > > session > > > buffer side as well. Given that the default session buffer size is > > > 8192 > > > with 32 byte initial capacity window one is likely to get multiple > > > #consume invocations with negative capacity. > > > > > > > > > > By adding the small initial window, I can see the capacityWindow > > > > going more > > > > and more negative on each consume() call with all the data > > > > buffered > > > > before > > > > the test code completes its first sleep. > > > > > > > > I don't have a specific use case for a slow consumer, just want > > > > to > > > > know if > > > > I'm misunderstanding something. > > > > > > > > > > I realized that we likely need to adjust the session buffer size > > > automatically when the initial window setting is below than value. > > > > > > Hope this helps > > > > > > Oleg > > > > > > > Thanks! > > > > Roy > > > > > > > > On Fri, Sep 6, 2019 at 10:15 AM Roy Hashimoto < > > > > roy.hashim...@gmail.com> > > > > wrote: > > > > > > > > > Those are good leads, I'll pursue them. > > > > > > > > > > Thanks! > > > > > Roy > > > > > > > > > > On Fri, Sep 6, 2019 at 9:57 AM Oleg Kalnichevski < > > > > > ol...@apache.org> > > > > > wrote: > > > > > > > > > > > On Fri, 2019-09-06 at 09:43 -0700, Ryan Schmitt wrote: > > > > > > > Have you looked at the reactive extensions for HttpCore5? > > > > > > > They > > > > > > > demonstrate > > > > > > > how to implement AsyncEntityProducer/AsyncDataProducer with > > > > > > > support > > > > > > > for > > > > > > > backpressure (or you can just use the Reactive Streams API > > > > > > > instead): > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/httpcomponents-core/tree/master/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive > > > > > > > > > > > > > > > > > > > > > > > > > > Just a bit of background. In 5.0 one can no longer assume > > > > > > that > > > > > > one > > > > > > message exchange has exclusive ownership of the underlying > > > > > > connection. > > > > > > Multiplexed message exchanges in HTTP/2 and piplelined > > > > > > message > > > > > > exchanges in HTTP/1.1 must not block other concurrent > > > > > > exchanges. > > > > > > Message changes however can update their current capacity via > > > > > > `CapacityChannel`. Reactive extensions is a great example and > > > > > > also an > > > > > > alternative to the native APIs per Ryan's recommendation. > > > > > > > > > > > > If you prefer the native APIs you can take a look at the > > > > > > classic > > > > > > I/O > > > > > > adaptors that essentially emulate the classic blocking i/o on > > > > > > top > > > > > > of > > > > > > the new async APIs [1] or HTTP/1.1 integration tests [2] that > > > > > > have a > > > > > > number of 'slow' consumer / producer test cases. > > > > > > > > > > > > Cheers > > > > > > > > > > > > Oleg > > > > > > > > > > > > [1] > > > > > > > > > > > > > > https://github.com/apache/httpcomponents-core/tree/master/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic > > > > > > [2] > > > > > > > > > > > > > > https://github.com/apache/httpcomponents-core/blob/master/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java > > > > > > > > > > > > > > > > > > > On Fri, Sep 6, 2019 at 9:33 AM Roy Hashimoto < > > > > > > > roy.hashim...@gmail.com > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > I'm playing with asynchronous handlers in HttpCore 5, and > > > > > > > > I'd > > > > > > > > like > > > > > > > > to have > > > > > > > > an AsyncEntityProducer write data at its own (slow) rate > > > > > > > > like > > > > > > > > in > > > > > > > > this old > > > > > > > > thread < > > > > > > > > > > > > > > https://marc.info/?l=httpclient-commons-dev&m=134928851229305&w=2 > > > > > > > > > . > > > > > > > > > > > > > > > > Writing to the DataStreamChannel whenever I want - > > > > > > > > outside > > > > > > > > the > > > > > > > > scope of a > > > > > > > > produce() method call - works fine, but I notice that > > > > > > > > produce() is > > > > > > > > being > > > > > > > > called every 5-6 milliseconds which ideally I would like > > > > > > > > to > > > > > > > > eliminate or > > > > > > > > reduce. > > > > > > > > > > > > > > > > The answer in the old thread was to use > > > > > > > > IOControl.suspendOutput() > > > > > > > > and > > > > > > > > IOControl.requestOutput(), but this class appears no > > > > > > > > longer > > > > > > > > to be > > > > > > > > in > > > > > > > > HttpCore 5. I see that there is a > > > > > > > > DataStreamChannel.requestOutput() > > > > > > > > but I > > > > > > > > haven't figured out what suspension call that should be > > > > > > > > paired > > > > > > > > with. I have > > > > > > > > tried simply returning 0 from my > > > > > > > > AsyncEntityProducer.available() > > > > > > > > override, > > > > > > > > but that doesn't seem to be it. > > > > > > > > > > > > > > > > Is there a new way to suspend/resume output in HttpCore > > > > > > > > 5? > > > > > > > > > > > > > > > > Thanks! > > > > > > > > Roy > > > > > > > > > > > > > > > > Kotlin source here > > > > > > > > < > > > > > > > > > > > > > > https://gist.github.com/rhashimoto/1f5501d3b5d2aa95251fe12f4f0be250 > > > > > > > > > . > > > > > > > > > > > > > > > > > > ----------------------------------------------------------- > > > > > > ---- > > > > > > ------ > > > > > > To unsubscribe, e-mail: dev-unsubscr...@hc.apache.org > > > > > > For additional commands, e-mail: dev-h...@hc.apache.org > > > > > > > > > > > > > > > > > > > > > ----------------------------------------------------------------- > > > ---- > > > To unsubscribe, e-mail: dev-unsubscr...@hc.apache.org > > > For additional commands, e-mail: dev-h...@hc.apache.org > > > > > > > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: dev-unsubscr...@hc.apache.org > For additional commands, e-mail: dev-h...@hc.apache.org > >
--------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@hc.apache.org For additional commands, e-mail: dev-h...@hc.apache.org