+1. We've also been hit by OOMs on the broker because we were not able to properly bound its memory usage.
On Mon, Nov 14, 2016 at 5:56 PM, radai <radai.rosenbl...@gmail.com> wrote: > @rajini - fixed the hasBytesBuffered() method. also updated poll() so that > no latency is added for picking up data stuck in ssl buffers (timeout is > set to 0, just like with immediately connected keys and staged receives). > thank you for pointing these out. > added ssl (re) testing to the KIP testing plan. > > > > > On Mon, Nov 14, 2016 at 7:24 AM, Rajini Sivaram < > rajinisiva...@googlemail.com> wrote: > >> Open point 1. I would just retain the current long value that specifies >> queued.max.bytes as long and not as %heap since it is simple and easy to >> use. And keeps it consistent with other ".bytes" configs. >> >> Point 3. ssl buffers - I am not quite sure the implementation looks >> correct. hasBytesBuffered() is checking position() of buffers == 0. And the >> code checks this only when poll with a timeout returns (adding a delay when >> there is nothing else to read). >> But since this and open point 2 (optimization) are implementation details, >> they can be looked at during PR review. >> >> It will be good to add SSL testing to the test plan as well, since there is >> additional code to test for SSL. >> >> >> On Fri, Nov 11, 2016 at 9:03 PM, radai <radai.rosenbl...@gmail.com> wrote: >> >> > ok, i've made the following changes: >> > >> > 1. memory.pool.class.name has been removed >> > 2. the code now only uses SimpleMemoryPool. the gc variant is left >> (unused) >> > as a developement aid and is unsettable via configuration. >> > 3. I've resolved the issue of stale data getting stuck in intermediate >> > (ssl) buffers. >> > 4. default value for queued.max.bytes is -1, so off by default. any <=0 >> > value is interpreted as off by the underlying code. >> > >> > open points: >> > >> > 1. the kafka config framework doesnt allow a value to be either long or >> > double, so in order to pull off the queued.max.bytes = 1000000 or >> > queued.max.bytes = 0.3 thing i'd need to define the config as type >> string, >> > which is ugly to me. do we want to support setting queued.max.bytes to % >> of >> > heap ? if so, by way of making queued.max.bytes of type string, or by way >> > of a 2nd config param (with the resulting either/all/combination? >> > validation). my personal opinion is string because i think a single >> > queued.max.bytes with overloaded meaning is more understandable to users. >> > i'll await other people's opinions before doing anything. >> > 2. i still need to evaluate rajini's optimization. sounds doable. >> > >> > asides: >> > >> > 1. i think you guys misunderstood the intent behind the gc pool. it was >> > never meant to be a magic pool that automatically releases buffers >> (because >> > just as rajini stated the performance implications would be horrible). it >> > was meant to catch leaks early. since that is indeed a dev-only concern >> it >> > wont ever get used in production. >> > 2. i said this on some other kip discussion: i think the nice thing about >> > the pool API is it "scales" from just keeping a memory bound to actually >> > re-using buffers without changing the calling code. i think >> actuallypooling >> > large buffers will result in a significant performance impact, but thats >> > outside the scope of this kip. at that point i think more pool >> > implementations (that actually pool) would be written. i agree with the >> > ideal of exposing as few knobs as possible, but switching pools (or pool >> > params) for tuning may happen at some later point. >> > >> > >> > >> > On Fri, Nov 11, 2016 at 11:44 AM, Rajini Sivaram < >> > rajinisiva...@googlemail.com> wrote: >> > >> > > 13. At the moment, I think channels are not muted if: >> > > channel.receive != null && channel.receive.buffer != null >> > > This mutes all channels that aren't holding onto a incomplete buffer. >> > They >> > > may or may not have read the 4-byte size. >> > > >> > > I was thinking you could avoid muting channels if: >> > > channel.receive == null || channel.receive.size.remaining() >> > > This will not mute channels that are holding onto a buffer (as above). >> In >> > > addition, it will not mute channels that haven't read the 4-byte size. >> A >> > > client that is closed gracefully while the pool is full will not be >> muted >> > > in this case and the server can process close without waiting for the >> > pool >> > > to free up. Once the 4-byte size is read, the channel will be muted if >> > the >> > > pool is still out of memory - for each channel, at most one failed read >> > > attempt would be made while the pool is out of memory. I think this >> would >> > > also delay muting of SSL channels since they can continue to read into >> > > their (already allocated) network buffers and unwrap the data and block >> > > only when they need to allocate a buffer from the pool. >> > > >> > > On Fri, Nov 11, 2016 at 6:00 PM, Jay Kreps <j...@confluent.io> wrote: >> > > >> > > > Hey Radai, >> > > > >> > > > +1 on deprecating and eventually removing the old config. The >> intention >> > > was >> > > > absolutely bounding memory usage. I think having two ways of doing >> > this, >> > > > one that gives a crisp bound on memory and one that is hard to reason >> > > about >> > > > is pretty confusing. I think people will really appreciate having one >> > > > config which instead lets them directly control the thing they >> actually >> > > > care about (memory). >> > > > >> > > > I also want to second Jun's concern on the complexity of the >> self-GCing >> > > > memory pool. I wrote the memory pool for the producer. In that area >> the >> > > > pooling of messages is the single biggest factor in performance of >> the >> > > > client so I believed it was worth some sophistication/complexity if >> > there >> > > > was performance payoff. All the same, the complexity of that code has >> > > made >> > > > it VERY hard to keep correct (it gets broken roughly every other time >> > > > someone makes a change). Over time I came to feel a lot less proud of >> > my >> > > > cleverness. I learned something interesting reading your self-GCing >> > > memory >> > > > pool, but I wonder if the complexity is worth the payoff in this >> case? >> > > > >> > > > Philosophically we've tried really hard to avoid needlessly >> "pluggable" >> > > > implementations. That is, when there is a temptation to give a config >> > > that >> > > > plugs in different Java classes at run time for implementation >> choices, >> > > we >> > > > should instead think of how to give the user the good behavior >> > > > automatically. I think the use case for configuring a the GCing pool >> > > would >> > > > be if you discovered a bug in which memory leaked. But this isn't >> > > something >> > > > the user should have to think about right? If there is a bug we >> should >> > > find >> > > > and fix it. >> > > > >> > > > -Jay >> > > > >> > > > On Fri, Nov 11, 2016 at 9:21 AM, radai <radai.rosenbl...@gmail.com> >> > > wrote: >> > > > >> > > > > jun's #1 + rajini's #11 - the new config param is to enable >> changing >> > > the >> > > > > pool implentation class. as i said in my response to jun i will >> make >> > > the >> > > > > default pool impl be the simple one, and this param is to allow a >> > user >> > > > > (more likely a dev) to change it. >> > > > > both the simple pool and the "gc pool" make basically just an >> > > > > AtomicLong.get() + (hashmap.put for gc) calls before returning a >> > > buffer. >> > > > > there is absolutely no dependency on GC times in allocating (or >> not). >> > > the >> > > > > extra background thread in the gc pool is forever asleep unless >> there >> > > are >> > > > > bugs (==leaks) so the extra cost is basically nothing (backed by >> > > > > benchmarks). let me re-itarate again - ANY BUFFER ALLOCATED MUST >> > ALWAYS >> > > > BE >> > > > > RELEASED - so the gc pool should not rely on gc for reclaiming >> > buffers. >> > > > its >> > > > > a bug detector, not a feature and is definitely not intended to >> hide >> > > > bugs - >> > > > > the exact opposite - its meant to expose them sooner. i've cleaned >> up >> > > the >> > > > > docs to avoid this confusion. i also like the fail on leak. will >> do. >> > > > > as for the gap between pool size and heap size - thats a valid >> > > argument. >> > > > > may allow also sizing the pool as % of heap size? so >> > queued.max.bytes = >> > > > > 1000000 for 1MB and queued.max.bytes = 0.25 for 25% of available >> > heap? >> > > > > >> > > > > jun's 2.2 - queued.max.bytes + socket.request.max.bytes still >> holds, >> > > > > assuming the ssl-related buffers are small. the largest weakness in >> > > this >> > > > > claim has to do with decompression rather than anything >> ssl-related. >> > so >> > > > yes >> > > > > there is an O(#ssl connections * sslEngine packet size) component, >> > but >> > > i >> > > > > think its small. again - decompression should be the concern. >> > > > > >> > > > > rajini's #13 - interesting optimization. the problem is there's no >> > > > knowing >> > > > > in advance what the _next_ request to come out of a socket is, so >> > this >> > > > > would mute just those sockets that are 1. mutable and 2. have a >> > > > > buffer-demanding request for which we could not allocate a buffer. >> > > > downside >> > > > > is that as-is this would cause the busy-loop on poll() that the >> mutes >> > > > were >> > > > > supposed to prevent - or code would need to be added to ad-hocmute >> a >> > > > > connection that was so-far unmuted but has now generated a >> > > > memory-demanding >> > > > > request? >> > > > > >> > > > > >> > > > > >> > > > > On Fri, Nov 11, 2016 at 5:02 AM, Rajini Sivaram < >> > > > > rajinisiva...@googlemail.com> wrote: >> > > > > >> > > > > > Radai, >> > > > > > >> > > > > > 11. The KIP talks about a new server configuration parameter >> > > > > > *memory.pool.class.name >> > > > > > <http://memory.pool.class.name> *which is not in the >> > implementation. >> > > > Is >> > > > > it >> > > > > > still the case that the pool will be configurable? >> > > > > > >> > > > > > 12. Personally I would prefer not to have a garbage collected >> pool >> > > that >> > > > > > hides bugs as well. Apart from the added code complexity and >> extra >> > > > thread >> > > > > > to handle collections, I am also concerned about the >> > > non-deterministic >> > > > > > nature of GC timings. The KIP introduces delays in processing >> > > requests >> > > > > > based on the configuration parameter *queued.max.bytes. *This in >> > > > > unrelated >> > > > > > to the JVM heap size and hence pool can be full when there is no >> > > > pressure >> > > > > > on the JVM to garbage collect. The KIP does not prevent other >> > > timeouts >> > > > in >> > > > > > the broker (eg. consumer session timeout) because it is relying >> on >> > > the >> > > > > pool >> > > > > > to be managed in a deterministic, timely manner. Since a garbage >> > > > > collected >> > > > > > pool cannot provide that guarantee, wouldn't it be better to run >> > > tests >> > > > > with >> > > > > > a GC-pool that perhaps fails with a fatal error if it encounters >> a >> > > > buffer >> > > > > > that was not released? >> > > > > > >> > > > > > 13. The implementation currently mutes all channels that don't >> > have a >> > > > > > receive buffer allocated. Would it make sense to mute only the >> > > channels >> > > > > > that need a buffer (i.e. allow channels to read the 4-byte size >> > that >> > > is >> > > > > not >> > > > > > read using the pool) so that normal client connection close() is >> > > > handled >> > > > > > even when the pool is full? Since the extra 4-bytes may already >> be >> > > > > > allocated for some connections, the total request memory has to >> > take >> > > > into >> > > > > > account *4*numConnections* bytes anyway. >> > > > > > >> > > > > > >> > > > > > On Thu, Nov 10, 2016 at 11:51 PM, Jun Rao <j...@confluent.io> >> > wrote: >> > > > > > >> > > > > > > Hi, Radai, >> > > > > > > >> > > > > > > 1. Yes, I am concerned about the trickiness of having to deal >> > with >> > > > > wreak >> > > > > > > refs. I think it's simpler to just have the simple version >> > > > instrumented >> > > > > > > with enough debug/trace logging and do enough stress testing. >> > Since >> > > > we >> > > > > > > still have queued.max.requests, one can always fall back to >> that >> > > if a >> > > > > > > memory leak issue is identified. We could also label the >> feature >> > as >> > > > > beta >> > > > > > if >> > > > > > > we don't think this is production ready. >> > > > > > > >> > > > > > > 2.2 I am just wondering after we fix that issue whether the >> claim >> > > > that >> > > > > > the >> > > > > > > request memory is bounded by queued.max.bytes + >> > > > > socket.request.max.bytes >> > > > > > > is still true. >> > > > > > > >> > > > > > > 5. Ok, leaving the default as -1 is fine then. >> > > > > > > >> > > > > > > Thanks, >> > > > > > > >> > > > > > > Jun >> > > > > > > >> > > > > > > On Wed, Nov 9, 2016 at 6:01 PM, radai < >> > radai.rosenbl...@gmail.com> >> > > > > > wrote: >> > > > > > > >> > > > > > > > Hi Jun, >> > > > > > > > >> > > > > > > > Thank you for taking the time to review this. >> > > > > > > > >> > > > > > > > 1. short version - yes, the concern is bugs, but the cost is >> > tiny >> > > > and >> > > > > > > worth >> > > > > > > > it, and its a common pattern. long version: >> > > > > > > > 1.1 detecting these types of bugs (leaks) cannot be easily >> > > done >> > > > > with >> > > > > > > > simple testing, but requires stress/stability tests that run >> > for >> > > a >> > > > > long >> > > > > > > > time (long enough to hit OOM, depending on leak size and >> > > available >> > > > > > > memory). >> > > > > > > > this is why some sort of leak detector is "standard practice" >> > > .for >> > > > > > > example >> > > > > > > > look at netty (http://netty.io/wiki/ >> reference-counted-objects. >> > > > > > > > html#leak-detection-levels) >> > > > > > > > <http://netty.io/wiki/reference-counted-objects. >> > > > > > > html#leak-detection-levels >> > > > > > > > >- >> > > > > > > > they have way more complicated built-in leak detection >> enabled >> > by >> > > > > > > default. >> > > > > > > > as a concrete example - during development i did not properly >> > > > dispose >> > > > > > of >> > > > > > > > in-progress KafkaChannel.receive when a connection was >> abruptly >> > > > > closed >> > > > > > > and >> > > > > > > > I only found it because of the log msg printed by the pool. >> > > > > > > > 1.2 I have a benchmark suite showing the performance cost >> of >> > > the >> > > > > gc >> > > > > > > pool >> > > > > > > > is absolutely negligible - >> > > > > > > > https://github.com/radai-rosenblatt/kafka-benchmarks/ >> > > > > > > > tree/master/memorypool-benchmarks >> > > > > > > > 1.3 as for the complexity of the impl - its just ~150 >> lines >> > > and >> > > > > > pretty >> > > > > > > > straight forward. i think the main issue is that not many >> > people >> > > > are >> > > > > > > > familiar with weak refs and ref queues. >> > > > > > > > >> > > > > > > > how about making the pool impl class a config param >> > (generally >> > > > > good >> > > > > > > > going forward), make the default be the simple pool, and keep >> > the >> > > > GC >> > > > > > one >> > > > > > > as >> > > > > > > > a dev/debug/triage aid? >> > > > > > > > >> > > > > > > > 2. the KIP itself doesnt specifically treat SSL at all - its >> an >> > > > > > > > implementation detail. as for my current patch, it has some >> > > minimal >> > > > > > > > treatment of SSL - just enough to not mute SSL sockets >> > > > mid-handshake >> > > > > - >> > > > > > > but >> > > > > > > > the code in SslTransportLayer still allocates buffers itself. >> > it >> > > is >> > > > > my >> > > > > > > > understanding that netReadBuffer/appReadBuffer shouldn't grow >> > > > beyond >> > > > > 2 >> > > > > > x >> > > > > > > > sslEngine.getSession().getPacketBufferSize(), which i assume >> > to >> > > be >> > > > > > > small. >> > > > > > > > they are also long lived (they live for the duration of the >> > > > > connection) >> > > > > > > > which makes a poor fit for pooling. the bigger fish to fry i >> > > think >> > > > is >> > > > > > > > decompression - you could read a 1MB blob into a >> pool-provided >> > > > buffer >> > > > > > and >> > > > > > > > then decompress it into 10MB of heap allocated on the spot >> :-) >> > > > also, >> > > > > > the >> > > > > > > > ssl code is extremely tricky. >> > > > > > > > 2.2 just to make sure, youre talking about Selector.java: >> > > while >> > > > > > > > ((networkReceive = channel.read()) != null) >> > > > > > addToStagedReceives(channel, >> > > > > > > > networkReceive); ? if so youre right, and i'll fix that >> > (probably >> > > > by >> > > > > > > > something similar to immediatelyConnectedKeys, not sure yet) >> > > > > > > > >> > > > > > > > 3. isOutOfMemory is self explanatory (and i'll add javadocs >> and >> > > > > update >> > > > > > > the >> > > > > > > > wiki). isLowOnMem is basically the point where I start >> > > randomizing >> > > > > the >> > > > > > > > selection key handling order to avoid potential starvation. >> its >> > > > > rather >> > > > > > > > arbitrary and now that i think of it should probably not >> exist >> > > and >> > > > be >> > > > > > > > entirely contained in Selector (where the shuffling takes >> > place). >> > > > > will >> > > > > > > fix. >> > > > > > > > >> > > > > > > > 4. will do. >> > > > > > > > >> > > > > > > > 5. I prefer -1 or 0 as an explicit "OFF" (or basically >> anything >> > > > <=0). >> > > > > > > > Long.MAX_VALUE would still create a pool, that would still >> > waste >> > > > time >> > > > > > > > tracking resources. I dont really mind though if you have a >> > > > preferred >> > > > > > > magic >> > > > > > > > value for off. >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > On Wed, Nov 9, 2016 at 9:28 AM, Jun Rao <j...@confluent.io> >> > > wrote: >> > > > > > > > >> > > > > > > > > Hi, Radai, >> > > > > > > > > >> > > > > > > > > Thanks for the KIP. Some comments below. >> > > > > > > > > >> > > > > > > > > 1. The KIP says "to facilitate faster implementation (as a >> > > safety >> > > > > > net) >> > > > > > > > the >> > > > > > > > > pool will be implemented in such a way that memory that was >> > not >> > > > > > > > release()ed >> > > > > > > > > (but still garbage collected) would be detected and >> > > "reclaimed". >> > > > > this >> > > > > > > is >> > > > > > > > to >> > > > > > > > > prevent "leaks" in case of code paths that fail to >> release() >> > > > > > > properly.". >> > > > > > > > > What are the cases that could cause memory leaks? If we are >> > > > > concerned >> > > > > > > > about >> > > > > > > > > bugs, it seems that it's better to just do more testing to >> > make >> > > > > sure >> > > > > > > the >> > > > > > > > > usage of the simple implementation (SimpleMemoryPool) is >> > solid >> > > > > > instead >> > > > > > > of >> > > > > > > > > adding more complicated logic (GarbageCollectedMemoryPool) >> to >> > > > hide >> > > > > > the >> > > > > > > > > potential bugs. >> > > > > > > > > >> > > > > > > > > 2. I am wondering how much this KIP covers the SSL channel >> > > > > > > > implementation. >> > > > > > > > > 2.1 SslTransportLayer maintains netReadBuffer, >> > netWriteBuffer, >> > > > > > > > > appReadBuffer per socket. Should those memory be accounted >> > for >> > > in >> > > > > > > memory >> > > > > > > > > pool? >> > > > > > > > > 2.2 One tricky thing with SSL is that during a >> > > > KafkaChannel.read(), >> > > > > > > it's >> > > > > > > > > possible for multiple NetworkReceives to be returned since >> > > > multiple >> > > > > > > > > requests' data could be encrypted together by SSL. To deal >> > with >> > > > > this, >> > > > > > > we >> > > > > > > > > stash those NetworkReceives in Selector.stagedReceives and >> > give >> > > > it >> > > > > > back >> > > > > > > > to >> > > > > > > > > the poll() call one NetworkReceive at a time. What this >> means >> > > is >> > > > > > that, >> > > > > > > if >> > > > > > > > > we stop reading from KafkaChannel in the middle because >> > memory >> > > > pool >> > > > > > is >> > > > > > > > > full, this channel's key may never get selected for reads >> > (even >> > > > > after >> > > > > > > the >> > > > > > > > > read interest is turned on), but there are still pending >> data >> > > for >> > > > > the >> > > > > > > > > channel, which will never get processed. >> > > > > > > > > >> > > > > > > > > 3. The code has the following two methods in MemoryPool, >> > which >> > > > are >> > > > > > not >> > > > > > > > > described in the KIP. Could you explain how they are used >> in >> > > the >> > > > > > wiki? >> > > > > > > > > isLowOnMemory() >> > > > > > > > > isOutOfMemory() >> > > > > > > > > >> > > > > > > > > 4. Could you also describe in the KIP at the high level, >> how >> > > the >> > > > > read >> > > > > > > > > interest bit for the socket is turned on/off with respect >> to >> > > > > > > MemoryPool? >> > > > > > > > > >> > > > > > > > > 5. Should queued.max.bytes defaults to -1 or >> Long.MAX_VALUE? >> > > > > > > > > >> > > > > > > > > Thanks, >> > > > > > > > > >> > > > > > > > > Jun >> > > > > > > > > >> > > > > > > > > On Mon, Nov 7, 2016 at 1:08 PM, radai < >> > > > radai.rosenbl...@gmail.com> >> > > > > > > > wrote: >> > > > > > > > > >> > > > > > > > > > Hi, >> > > > > > > > > > >> > > > > > > > > > I would like to initiate a vote on KIP-72: >> > > > > > > > > > >> > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> > 72%3A+ >> > > > > > > > > > Allow+putting+a+bound+on+memory+consumed+by+Incoming+ >> > > requests >> > > > > > > > > > >> > > > > > > > > > The kip allows specifying a limit on the amount of memory >> > > > > allocated >> > > > > > > for >> > > > > > > > > > reading incoming requests into. This is useful for >> > "sizing" a >> > > > > > broker >> > > > > > > > and >> > > > > > > > > > avoiding OOMEs under heavy load (as actually happens >> > > > occasionally >> > > > > > at >> > > > > > > > > > linkedin). >> > > > > > > > > > >> > > > > > > > > > I believe I've addressed most (all?) concerns brought up >> > > during >> > > > > the >> > > > > > > > > > discussion. >> > > > > > > > > > >> > > > > > > > > > To the best of my understanding this vote is about the >> goal >> > > and >> > > > > > > > > > public-facing changes related to the new proposed >> behavior, >> > > but >> > > > > as >> > > > > > > for >> > > > > > > > > > implementation, i have the code up here: >> > > > > > > > > > >> > > > > > > > > > https://github.com/radai-rosenblatt/kafka/tree/broker- >> > memory >> > > > > > > > > > -pool-with-muting >> > > > > > > > > > >> > > > > > > > > > and I've stress-tested it to work properly (meaning it >> > chugs >> > > > > along >> > > > > > > and >> > > > > > > > > > throttles under loads that would DOS 10.0.1.0 code). >> > > > > > > > > > >> > > > > > > > > > I also believe that the primitives and "pattern"s >> > introduced >> > > in >> > > > > > this >> > > > > > > > KIP >> > > > > > > > > > (namely the notion of a buffer pool and retrieving from / >> > > > > releasing >> > > > > > > to >> > > > > > > > > said >> > > > > > > > > > pool instead of allocating memory) are generally useful >> > > beyond >> > > > > the >> > > > > > > > scope >> > > > > > > > > of >> > > > > > > > > > this KIP for both performance issues (allocating lots of >> > > > > > short-lived >> > > > > > > > > large >> > > > > > > > > > buffers is a performance bottleneck) and other areas >> where >> > > > memory >> > > > > > > > limits >> > > > > > > > > > are a problem (KIP-81) >> > > > > > > > > > >> > > > > > > > > > Thank you, >> > > > > > > > > > >> > > > > > > > > > Radai. >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > -- >> > > > > > Regards, >> > > > > > >> > > > > > Rajini >> > > > > > >> > > > > >> > > > >> > > >> > > >> > > >> > > -- >> > > Regards, >> > > >> > > Rajini >> > > >> > >> >> >> >> -- >> Regards, >> >> Rajini >>