the proposal looks good to me.

just out of curiosity, why PoolingPolicy only has `UnpooledHeap` and
`PooledDirect`?

- Sijie

On Wed, Oct 10, 2018 at 12:51 PM Matteo Merli <mme...@apache.org> wrote:

> (CCed dev@bookkeeper.apache.org since this proposal is referring to both
> projects and I thought it might be good to have a single discussion thread)
>
> Wiki page is available at:
> https://github.com/apache/pulsar/wiki/PIP-24%3A-Simplify-memory-settings
>
> Pasting here for convenience:
>
> ------------------------------------------------------------
>
>
> ## Motivation
>
> Configuring the correct JVM memory settings and cache sizes for a Pulsar
> cluster should be
> simplified.
>
> There are currently many knobs in Netty or JVM flags for different
> components and while
> with a good setup the systems is very stable, it's easy to setup
> non-optimal configurations
> which might result in OutOfMemory errors under load.
>
> Ideally, there should be very minimal configuration required to bring up a
> Pulsar cluster
> that can work under a wide set of traffic loads. In any case, we should
> prefer to automatically
> fallback to slower alternatives, when possible, instead of throwing OOM
> exceptions.
>
> ## Goals
>
>  1. Default setting should allow Pulsar to use the all the memory as
> configured on the JVM,
>     irrespective of Direct vs Heap memory
>  2. Automatically set the size of caches based on the amount of memory
> available to the JVM
>  3. Allow to disable pooling completely for environments where memory is
> scarce
>  4. Allow to configure different policies to have different fallback
> options when the memory
>     quotas are reached
>
>
> ## Changes
>
> ### Netty Allocator Wrapper
>
> Create an allocator wrapper that can be configured with different
> behaviors. This will be
> using the regular `PooledByteBufAllocator` but will have a configuration
> object to decide
> what to do in particular moments. It will also serve as a way to group and
> simplify all
> the Netty allocator options which are currently spread across multiple
> system properties,
> for which the documentation is not easily searchable.
>
> The wrapper will be configured and instantianted through a builder class:
>
> ```java
> public interface ByteBufAllocatorBuilder {
>
>     /**
>      * Creates a new {@link ByteBufAllocatorBuilder}.
>      */
>     public static ByteBufAllocatorBuilder create() {
>         return new ByteBufAllocatorBuilderImpl();
>     }
>
>     /**
>      * Finalize the configured {@link ByteBufAllocator}
>      */
>     ByteBufAllocator build();
>
>     /**
>      * Specify a custom allocator where the allocation requests should be
> forwarded to.
>      *
>      * <p>
>      * Default is to used {@link PooledByteBufAllocator#DEFAULT} when
> pooling is required or
>      * {@link UnpooledByteBufAllocator} otherwise.
>      */
>     ByteBufAllocatorBuilder allocator(ByteBufAllocator allocator);
>
>     /**
>      * Define the memory pooling policy
>      *
>      * <p>
>      * Default is {@link PoolingPolicy#PooledDirect}
>      */
>     ByteBufAllocatorBuilder poolingPolicy(PoolingPolicy policy);
>
>     /**
>      * Controls the amount of concurrency for the memory pool.
>      *
>      * <p>
>      * Default is to have a number of allocator arenas equals to 2 * CPUS.
>      * <p>
>      * Decreasing this number will reduce the amount of memory overhead, at
> the expense of increased allocation
>      * contention.
>      */
>     ByteBufAllocatorBuilder poolingConcurrency(int poolingConcurrency);
>
>     /**
>      * Define the OutOfMemory handling policy
>      *
>      * <p>
>      * Default is {@link OomPolicy#FallbackToHeap}
>      */
>     ByteBufAllocatorBuilder oomPolicy(OomPolicy policy);
>
>     /**
>      * Enable the leak detection for
>      *
>      * <p>
>      * Default is {@link LeakDetectionPolicy#Disabled}
>      */
>     ByteBufAllocatorBuilder leakDetectionPolicy(LeakDetectionPolicy
> leakDetectionPolicy);
> }
> ```
>
> The policies are here defined:
>
> ```java
> /**
>  * Define a policy for allocating buffers
>  */
> public enum PoolingPolicy {
>
>     /**
>      * Allocate memory from JVM heap without any pooling.
>      *
>      * This option has the least overhead in terms of memory usage since
> the memory will be automatically reclaimed by
>      * the JVM GC but might impose a performance penalty at high
> throughput.
>      */
>     UnpooledHeap,
>
>     /**
>      * Use Direct memory for all buffers and pool the memory.
>      *
>      * Direct memory will avoid the overhead of JVM GC and most memory
> copies when reading and writing to socket
>      * channel.
>      *
>      * Pooling will add memory space overhead due to the fact that there
> will be fragmentation in the allocator and that
>      * threads will keep a portion of memory as thread-local to avoid
> contention when possible.
>      */
>     PooledDirect
> }
>
> /**
>  * Represents the action to take when it's not possible to allocate memory.
>  */
> public enum OomPolicy {
>
>     /**
>      * Throw regular OOM exception without taking addition actions
>      */
>     ThrowException,
>
>     /**
>      * If it's not possible to allocate a buffer from direct memory,
> fallback to allocate an unpooled buffer from JVM
>      * heap.
>      *
>      * This will help absorb memory allocation spikes because the heap
> allocations will naturally slow down the process
>      * and will result if full GC cleanup if the Heap itself is full.
>      */
>     FallbackToHeap,
>
>     /**
>      * If it's not possible to allocate memory, kill the JVM process so
> that it can be restarted immediately.
>      *
>      */
>     KillProcess,
> }
>
> /**
>  * Define the policy for the Netty leak detector
>  */
> public enum LeakDetectionPolicy {
>
>     /**
>      * No leak detection and no overhead
>      */
>     Disabled,
>
>     /**
>      * Instruments 1% of the allocated buffer to track for leaks
>      */
>     Simple,
>
>     /**
>      * Instruments 1% of the allocated buffer to track for leaks, reporting
> stack traces of places where the buffer was
>      * used
>      */
>     Advanced,
>
>     /**
>      * Instruments 100% of the allocated buffer to track for leaks,
> reporting stack traces of places where the buffer
>      * was used. Introduce very significant overhead.
>      */
>     Paranoid,
> }
> ```
>
> It will be possible to create an allocator through the builder and then
> pass it through
> to Netty client/server or just directly allocate buffers.
>
> ```java
> ByteBufAllocator allocator = ByteBufAllocatorBuilder.create()
>         .poolingPolicy(PoolingPolicy.PooledDirect)
>         .oomPolicy(OomPolicy.FallbackToHeap)
>         .leakDetectionPolicy(LeakDetectionPolicy.Disabled)
>         .build();
> ```
>
> ### Component changes
>
> In addition to used the policies based allocator wrapper, each component
> will have
> additional changes.
>
> #### Pulsar broker
>
> Add configuration options in `broker.conf` to allow configuration of the
> allocator. Eg.:
>
> ```properties
> allocatorPoolingPolicy=PooledDirect
> allocatorPoolingConcurrency=4
> allocatorOomPolicy=FallbackToHeap
> allocatorLeakDetectionPolicy=Disabled
> ```
>
> ##### Managed ledger cache
>
> Currently, in Pulsar broker, the only memory pooled from the direct memory
> region, in
> addition to regular IO buffer is the ManagedLedgerCache. This cache is used
> to dispatch
> directly to consumers (once a message is persisted), avoiding reads from
> bookies for
> consumers that are caught up with producers.
>
> By default, the managed ledger cache size will be set to 1/3rd of the total
> available
> direct memory (or heap if pooling is disabled).
>
> The setting will be left empty to indicate the default dynamic behavior:
>
> ```
> managedLedgerCacheSizeMb=
> ```
>
> #### BookKeeper Client
>
> Add options to configure the allocator in `ClientConfiguration` object.
>
> #### Bookie
>
> Add options to configure the allocator in `ServerConfiguration` object and
> `bookkeeper.conf`.
>
> By default, in Pulsar we configure BookKeeper to use DbLedgerStorage. This
> storage
> implementation has 2 main sources of memory allocations, the read and write
> caches.
>
> By default, the configured direct memory region will be divided into 3
> portions:
>  * IO buffers - (50% and max to 4GB)
>  * Write cache - 25 %
>  * Read cache - 25 %
>
> If there is a lot of direct memory available, max 4GB will be assigned to
> IO buffers and
> the rest will be split between read and write caches.
>
> This will still not take into account the memory used by RocksDB block
> cache, since this
> will be allocated from within the JNI library and not accounted for in JVM
> heap or
> direct memory regions.
>
> The rule of thumb here would be to default to a size pegged to the direct
> memory size,
> say 1/5th of it.
>
> #### Pulsar Client
>
> Add options to configure allocator policies in `PulsarClientBuilder`.
>
> Additionally, for `PulsarClient` we should be able to define a max amount
> of memory
> that a single instance is allowed to use.
>
> This memory will be used when accumulating messages in the producers
> pending messages
> queue or consumer receiving queues.
>
> When the assigned client memory is filled up, some actions will be taken:
>
>  * For producer it would be the same as the producer queue full condition,
> with either
>    immediate send error or blocking behavior, depending on existing
> configuration.
>  * For consumers, the flow control mechanism will be slowed down, by not
> asking the
>    brokers for more messages, once the memory is full.
>
> A reasonable default might be to use 64 MB per client instance, which will
> be shared
> across all producers consumers created by that instance.
>
> --
> Matteo Merli
> <mme...@apache.org>
>

Reply via email to