> > > > client.start(Http1Config.custom().setBufferSize(256).setInitialWindowSize(32).build()); > > > This change alone will have little effect without adjusting the session > buffer side as well. Given that the default session buffer size is 8192 > with 32 byte initial capacity window one is likely to get multiple > #consume invocations with negative capacity.
Hmm. Isn't setBufferSize(256) for the session buffer? Also, if I increase the amount of request data in the test to more than 8 KB: return new MultiLineResponseHandler("0123456789abcd", 1000); Then I can see the capacity window go far more negative than -8192, which I wouldn't expect with your explanation. It looks like AbstractHttp1StreamDuplexer reads its data with a blocking read without explicitly waiting on the NIO Selector configured by CapacityChannel (unless I missed it in the code). Does this type of read implicitly use the Selector? It doesn't appear to me that it does. For example, I can reach this line <https://github.com/apache/httpcomponents-core/blob/9a1b26e06e35220fb349de2e8c4197c1ac87dcf9/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java#L272> right after the blocking read() when session.ioSession.key.interestOps is 0. Don't waste time on this if I'm not making sense. I'm not that familiar with the NIO API, and I don't have a use case that depends on this. Just wanted to bring it to someone's attention in case it indicated an issue. Roy On Mon, Sep 9, 2019 at 2:02 AM Oleg Kalnichevski <ol...@apache.org> wrote: > On Sun, 2019-09-08 at 17:07 -0700, Roy Hashimoto wrote: > > By looking at the suggested 5.0 examples I was able to get an > > AsyncServerExchangeHandler subclass to play nicely with Kotlin > > coroutines > > on the AsyncDataProducer side of things, i.e. minimizing produce() > > polling > > and avoiding buffering. > > > > I haven't been as successful with throttling calls on the > > AsyncDataConsumer > > side, i.e. consume() calls keep being made even though the capacity > > window > > has gone negative. I think this might be the expected behavior > > because of > > this comment in AbstractHttp1StreamDuplexer: > > > > // At present the consumer can be forced to consume data > > // over its declared capacity in order to avoid having > > // unprocessed message body content stuck in the session > > // input buffer > > > > Does that refer to just the case where the capacity starts positive > > but > > data exceeding the capacity is delivered to consume()? Or does it > > refer to > > the behavior I see, which is that capacity updates (or the lack of > > them) > > don't seem to have any effect for HTTP/1.1? > > > > It is the former. Whatever data read from the underlying socket channel > into the session buffer will be force-fed into the consumer regardless > of its declared capacity for HTTP/1.1 connections. > > > I've also tried running the > > Http1IntegrationTest.testSlowResponseConsumer() > > test, substituting this line to trigger updateCapacity() calls: > > > > > > client.start(Http1Config.custom().setBufferSize(256).setInitialWindow > > Size(32).build()); > > > > This change alone will have little effect without adjusting the session > buffer side as well. Given that the default session buffer size is 8192 > with 32 byte initial capacity window one is likely to get multiple > #consume invocations with negative capacity. > > > > By adding the small initial window, I can see the capacityWindow > > going more > > and more negative on each consume() call with all the data buffered > > before > > the test code completes its first sleep. > > > > I don't have a specific use case for a slow consumer, just want to > > know if > > I'm misunderstanding something. > > > > I realized that we likely need to adjust the session buffer size > automatically when the initial window setting is below than value. > > Hope this helps > > Oleg > > > Thanks! > > Roy > > > > On Fri, Sep 6, 2019 at 10:15 AM Roy Hashimoto < > > roy.hashim...@gmail.com> > > wrote: > > > > > Those are good leads, I'll pursue them. > > > > > > Thanks! > > > Roy > > > > > > On Fri, Sep 6, 2019 at 9:57 AM Oleg Kalnichevski <ol...@apache.org> > > > wrote: > > > > > > > On Fri, 2019-09-06 at 09:43 -0700, Ryan Schmitt wrote: > > > > > Have you looked at the reactive extensions for HttpCore5? They > > > > > demonstrate > > > > > how to implement AsyncEntityProducer/AsyncDataProducer with > > > > > support > > > > > for > > > > > backpressure (or you can just use the Reactive Streams API > > > > > instead): > > > > > > > > > > > > > > > > > > > > https://github.com/apache/httpcomponents-core/tree/master/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive > > > > > > > > > > > > > > > > > > Just a bit of background. In 5.0 one can no longer assume that > > > > one > > > > message exchange has exclusive ownership of the underlying > > > > connection. > > > > Multiplexed message exchanges in HTTP/2 and piplelined message > > > > exchanges in HTTP/1.1 must not block other concurrent exchanges. > > > > Message changes however can update their current capacity via > > > > `CapacityChannel`. Reactive extensions is a great example and > > > > also an > > > > alternative to the native APIs per Ryan's recommendation. > > > > > > > > If you prefer the native APIs you can take a look at the classic > > > > I/O > > > > adaptors that essentially emulate the classic blocking i/o on top > > > > of > > > > the new async APIs [1] or HTTP/1.1 integration tests [2] that > > > > have a > > > > number of 'slow' consumer / producer test cases. > > > > > > > > Cheers > > > > > > > > Oleg > > > > > > > > [1] > > > > > > https://github.com/apache/httpcomponents-core/tree/master/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic > > > > [2] > > > > > > https://github.com/apache/httpcomponents-core/blob/master/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java > > > > > > > > > > > > > On Fri, Sep 6, 2019 at 9:33 AM Roy Hashimoto < > > > > > roy.hashim...@gmail.com > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > I'm playing with asynchronous handlers in HttpCore 5, and I'd > > > > > > like > > > > > > to have > > > > > > an AsyncEntityProducer write data at its own (slow) rate like > > > > > > in > > > > > > this old > > > > > > thread < > > > > > > > https://marc.info/?l=httpclient-commons-dev&m=134928851229305&w=2 > > > > > > > . > > > > > > > > > > > > Writing to the DataStreamChannel whenever I want - outside > > > > > > the > > > > > > scope of a > > > > > > produce() method call - works fine, but I notice that > > > > > > produce() is > > > > > > being > > > > > > called every 5-6 milliseconds which ideally I would like to > > > > > > eliminate or > > > > > > reduce. > > > > > > > > > > > > The answer in the old thread was to use > > > > > > IOControl.suspendOutput() > > > > > > and > > > > > > IOControl.requestOutput(), but this class appears no longer > > > > > > to be > > > > > > in > > > > > > HttpCore 5. I see that there is a > > > > > > DataStreamChannel.requestOutput() > > > > > > but I > > > > > > haven't figured out what suspension call that should be > > > > > > paired > > > > > > with. I have > > > > > > tried simply returning 0 from my > > > > > > AsyncEntityProducer.available() > > > > > > override, > > > > > > but that doesn't seem to be it. > > > > > > > > > > > > Is there a new way to suspend/resume output in HttpCore 5? > > > > > > > > > > > > Thanks! > > > > > > Roy > > > > > > > > > > > > Kotlin source here > > > > > > < > > > > > > > https://gist.github.com/rhashimoto/1f5501d3b5d2aa95251fe12f4f0be250 > > > > > > > . > > > > > > > > > > > > --------------------------------------------------------------- > > > > ------ > > > > To unsubscribe, e-mail: dev-unsubscr...@hc.apache.org > > > > For additional commands, e-mail: dev-h...@hc.apache.org > > > > > > > > > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: dev-unsubscr...@hc.apache.org > For additional commands, e-mail: dev-h...@hc.apache.org > >