the proposal looks good to me. just out of curiosity, why PoolingPolicy only has `UnpooledHeap` and `PooledDirect`?
- Sijie On Wed, Oct 10, 2018 at 12:51 PM Matteo Merli <mme...@apache.org> wrote: > (CCed d...@bookkeeper.apache.org since this proposal is referring to both > projects and I thought it might be good to have a single discussion thread) > > Wiki page is available at: > https://github.com/apache/pulsar/wiki/PIP-24%3A-Simplify-memory-settings > > Pasting here for convenience: > > ------------------------------------------------------------ > > > ## Motivation > > Configuring the correct JVM memory settings and cache sizes for a Pulsar > cluster should be > simplified. > > There are currently many knobs in Netty or JVM flags for different > components and while > with a good setup the systems is very stable, it's easy to setup > non-optimal configurations > which might result in OutOfMemory errors under load. > > Ideally, there should be very minimal configuration required to bring up a > Pulsar cluster > that can work under a wide set of traffic loads. In any case, we should > prefer to automatically > fallback to slower alternatives, when possible, instead of throwing OOM > exceptions. > > ## Goals > > 1. Default setting should allow Pulsar to use the all the memory as > configured on the JVM, > irrespective of Direct vs Heap memory > 2. Automatically set the size of caches based on the amount of memory > available to the JVM > 3. Allow to disable pooling completely for environments where memory is > scarce > 4. Allow to configure different policies to have different fallback > options when the memory > quotas are reached > > > ## Changes > > ### Netty Allocator Wrapper > > Create an allocator wrapper that can be configured with different > behaviors. This will be > using the regular `PooledByteBufAllocator` but will have a configuration > object to decide > what to do in particular moments. It will also serve as a way to group and > simplify all > the Netty allocator options which are currently spread across multiple > system properties, > for which the documentation is not easily searchable. > > The wrapper will be configured and instantianted through a builder class: > > ```java > public interface ByteBufAllocatorBuilder { > > /** > * Creates a new {@link ByteBufAllocatorBuilder}. > */ > public static ByteBufAllocatorBuilder create() { > return new ByteBufAllocatorBuilderImpl(); > } > > /** > * Finalize the configured {@link ByteBufAllocator} > */ > ByteBufAllocator build(); > > /** > * Specify a custom allocator where the allocation requests should be > forwarded to. > * > * <p> > * Default is to used {@link PooledByteBufAllocator#DEFAULT} when > pooling is required or > * {@link UnpooledByteBufAllocator} otherwise. > */ > ByteBufAllocatorBuilder allocator(ByteBufAllocator allocator); > > /** > * Define the memory pooling policy > * > * <p> > * Default is {@link PoolingPolicy#PooledDirect} > */ > ByteBufAllocatorBuilder poolingPolicy(PoolingPolicy policy); > > /** > * Controls the amount of concurrency for the memory pool. > * > * <p> > * Default is to have a number of allocator arenas equals to 2 * CPUS. > * <p> > * Decreasing this number will reduce the amount of memory overhead, at > the expense of increased allocation > * contention. > */ > ByteBufAllocatorBuilder poolingConcurrency(int poolingConcurrency); > > /** > * Define the OutOfMemory handling policy > * > * <p> > * Default is {@link OomPolicy#FallbackToHeap} > */ > ByteBufAllocatorBuilder oomPolicy(OomPolicy policy); > > /** > * Enable the leak detection for > * > * <p> > * Default is {@link LeakDetectionPolicy#Disabled} > */ > ByteBufAllocatorBuilder leakDetectionPolicy(LeakDetectionPolicy > leakDetectionPolicy); > } > ``` > > The policies are here defined: > > ```java > /** > * Define a policy for allocating buffers > */ > public enum PoolingPolicy { > > /** > * Allocate memory from JVM heap without any pooling. > * > * This option has the least overhead in terms of memory usage since > the memory will be automatically reclaimed by > * the JVM GC but might impose a performance penalty at high > throughput. > */ > UnpooledHeap, > > /** > * Use Direct memory for all buffers and pool the memory. > * > * Direct memory will avoid the overhead of JVM GC and most memory > copies when reading and writing to socket > * channel. > * > * Pooling will add memory space overhead due to the fact that there > will be fragmentation in the allocator and that > * threads will keep a portion of memory as thread-local to avoid > contention when possible. > */ > PooledDirect > } > > /** > * Represents the action to take when it's not possible to allocate memory. > */ > public enum OomPolicy { > > /** > * Throw regular OOM exception without taking addition actions > */ > ThrowException, > > /** > * If it's not possible to allocate a buffer from direct memory, > fallback to allocate an unpooled buffer from JVM > * heap. > * > * This will help absorb memory allocation spikes because the heap > allocations will naturally slow down the process > * and will result if full GC cleanup if the Heap itself is full. > */ > FallbackToHeap, > > /** > * If it's not possible to allocate memory, kill the JVM process so > that it can be restarted immediately. > * > */ > KillProcess, > } > > /** > * Define the policy for the Netty leak detector > */ > public enum LeakDetectionPolicy { > > /** > * No leak detection and no overhead > */ > Disabled, > > /** > * Instruments 1% of the allocated buffer to track for leaks > */ > Simple, > > /** > * Instruments 1% of the allocated buffer to track for leaks, reporting > stack traces of places where the buffer was > * used > */ > Advanced, > > /** > * Instruments 100% of the allocated buffer to track for leaks, > reporting stack traces of places where the buffer > * was used. Introduce very significant overhead. > */ > Paranoid, > } > ``` > > It will be possible to create an allocator through the builder and then > pass it through > to Netty client/server or just directly allocate buffers. > > ```java > ByteBufAllocator allocator = ByteBufAllocatorBuilder.create() > .poolingPolicy(PoolingPolicy.PooledDirect) > .oomPolicy(OomPolicy.FallbackToHeap) > .leakDetectionPolicy(LeakDetectionPolicy.Disabled) > .build(); > ``` > > ### Component changes > > In addition to used the policies based allocator wrapper, each component > will have > additional changes. > > #### Pulsar broker > > Add configuration options in `broker.conf` to allow configuration of the > allocator. Eg.: > > ```properties > allocatorPoolingPolicy=PooledDirect > allocatorPoolingConcurrency=4 > allocatorOomPolicy=FallbackToHeap > allocatorLeakDetectionPolicy=Disabled > ``` > > ##### Managed ledger cache > > Currently, in Pulsar broker, the only memory pooled from the direct memory > region, in > addition to regular IO buffer is the ManagedLedgerCache. This cache is used > to dispatch > directly to consumers (once a message is persisted), avoiding reads from > bookies for > consumers that are caught up with producers. > > By default, the managed ledger cache size will be set to 1/3rd of the total > available > direct memory (or heap if pooling is disabled). > > The setting will be left empty to indicate the default dynamic behavior: > > ``` > managedLedgerCacheSizeMb= > ``` > > #### BookKeeper Client > > Add options to configure the allocator in `ClientConfiguration` object. > > #### Bookie > > Add options to configure the allocator in `ServerConfiguration` object and > `bookkeeper.conf`. > > By default, in Pulsar we configure BookKeeper to use DbLedgerStorage. This > storage > implementation has 2 main sources of memory allocations, the read and write > caches. > > By default, the configured direct memory region will be divided into 3 > portions: > * IO buffers - (50% and max to 4GB) > * Write cache - 25 % > * Read cache - 25 % > > If there is a lot of direct memory available, max 4GB will be assigned to > IO buffers and > the rest will be split between read and write caches. > > This will still not take into account the memory used by RocksDB block > cache, since this > will be allocated from within the JNI library and not accounted for in JVM > heap or > direct memory regions. > > The rule of thumb here would be to default to a size pegged to the direct > memory size, > say 1/5th of it. > > #### Pulsar Client > > Add options to configure allocator policies in `PulsarClientBuilder`. > > Additionally, for `PulsarClient` we should be able to define a max amount > of memory > that a single instance is allowed to use. > > This memory will be used when accumulating messages in the producers > pending messages > queue or consumer receiving queues. > > When the assigned client memory is filled up, some actions will be taken: > > * For producer it would be the same as the producer queue full condition, > with either > immediate send error or blocking behavior, depending on existing > configuration. > * For consumers, the flow control mechanism will be slowed down, by not > asking the > brokers for more messages, once the memory is full. > > A reasonable default might be to use 64 MB per client instance, which will > be shared > across all producers consumers created by that instance. > > -- > Matteo Merli > <mme...@apache.org> >