sijie closed pull request #1754: Netty allocator wrapper
URL: https://github.com/apache/bookkeeper/pull/1754
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-common-allocator/pom.xml 
b/bookkeeper-common-allocator/pom.xml
new file mode 100644
index 0000000000..a98889923e
--- /dev/null
+++ b/bookkeeper-common-allocator/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.bookkeeper</groupId>
+    <artifactId>bookkeeper</artifactId>
+    <version>4.9.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>bookkeeper-common-allocator</artifactId>
+  <name>Apache BookKeeper :: Common :: Allocator</name>
+  <dependencies>
+     <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-buffer</artifactId>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>com.github.spotbugs</groupId>
+        <artifactId>spotbugs-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git 
a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/ByteBufAllocatorBuilder.java
 
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/ByteBufAllocatorBuilder.java
new file mode 100644
index 0000000000..d749efd5ce
--- /dev/null
+++ 
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/ByteBufAllocatorBuilder.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.allocator;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
+import java.util.function.Consumer;
+
+import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorBuilderImpl;
+
+/**
+ * Builder object to customize a ByteBuf allocator.
+ */
+public interface ByteBufAllocatorBuilder {
+    /**
+     * Creates a new {@link ByteBufAllocatorBuilder}.
+     */
+    static ByteBufAllocatorBuilder create() {
+        return new ByteBufAllocatorBuilderImpl();
+    }
+
+    /**
+     * Finalize the configured {@link ByteBufAllocator}.
+     */
+    ByteBufAllocator build();
+
+    /**
+     * Specify a custom allocator where the allocation requests should be
+     * forwarded to.
+     *
+     * <p>Default is to use a new instance of {@link PooledByteBufAllocator}.
+     */
+    ByteBufAllocatorBuilder pooledAllocator(ByteBufAllocator pooledAllocator);
+
+    /**
+     * Specify a custom allocator where the allocation requests should be
+     * forwarded to.
+     *
+     * <p>Default is to use {@link UnpooledByteBufAllocator#DEFAULT}.
+     */
+    ByteBufAllocatorBuilder unpooledAllocator(ByteBufAllocator 
unpooledAllocator);
+
+    /**
+     * Define the memory pooling policy.
+     *
+     * <p>Default is {@link PoolingPolicy#PooledDirect}
+     */
+    ByteBufAllocatorBuilder poolingPolicy(PoolingPolicy policy);
+
+    /**
+     * Controls the amount of concurrency for the memory pool.
+     *
+     * <p>Default is to have a number of allocator arenas equals to 2 * CPUS.
+     *
+     * <p>Decreasing this number will reduce the amount of memory overhead, at 
the
+     * expense of increased allocation contention.
+     */
+    ByteBufAllocatorBuilder poolingConcurrency(int poolingConcurrency);
+
+    /**
+     * Define the OutOfMemory handling policy.
+     *
+     * <p>Default is {@link OutOfMemoryPolicy#FallbackToHeap}
+     */
+    ByteBufAllocatorBuilder outOfMemoryPolicy(OutOfMemoryPolicy policy);
+
+    /**
+     * Add a listener that is triggered whenever there is an allocation 
failure.
+     *
+     * <p>Application can use this to trigger alerting or process restarting.
+     */
+    ByteBufAllocatorBuilder outOfMemoryListener(Consumer<OutOfMemoryError> 
listener);
+
+    /**
+     * Enable the leak detection for the allocator.
+     *
+     * <p>Default is {@link LeakDetectionPolicy#Disabled}
+     */
+    ByteBufAllocatorBuilder leakDetectionPolicy(LeakDetectionPolicy 
leakDetectionPolicy);
+}
diff --git 
a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/LeakDetectionPolicy.java
 
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/LeakDetectionPolicy.java
new file mode 100644
index 0000000000..476684778a
--- /dev/null
+++ 
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/LeakDetectionPolicy.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.allocator;
+
+/**
+ * Define the policy for the Netty leak detector.
+ */
+public enum LeakDetectionPolicy {
+
+    /**
+     * No leak detection and no overhead.
+     */
+    Disabled,
+
+    /**
+     * Instruments 1% of the allocated buffer to track for leaks.
+     */
+    Simple,
+
+    /**
+     * Instruments 1% of the allocated buffer to track for leaks, reporting
+     * stack traces of places where the buffer was used.
+     */
+    Advanced,
+
+    /**
+     * Instruments 100% of the allocated buffer to track for leaks, reporting
+     * stack traces of places where the buffer was used. Introduce very
+     * significant overhead.
+     */
+    Paranoid,
+}
diff --git 
a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/OutOfMemoryPolicy.java
 
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/OutOfMemoryPolicy.java
new file mode 100644
index 0000000000..ff72050709
--- /dev/null
+++ 
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/OutOfMemoryPolicy.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.allocator;
+
+/**
+ * Represents the action to take when it's not possible to allocate memory.
+ */
+public enum OutOfMemoryPolicy {
+
+    /**
+     * Throw regular OOM exception without taking addition actions.
+     */
+    ThrowException,
+
+    /**
+     * If it's not possible to allocate a buffer from direct memory, fallback 
to
+     * allocate an unpooled buffer from JVM heap.
+     *
+     * <p>This will help absorb memory allocation spikes because the heap
+     * allocations will naturally slow down the process and will result if full
+     * GC cleanup if the Heap itself is full.
+     */
+    FallbackToHeap,
+}
diff --git 
a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/PoolingPolicy.java
 
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/PoolingPolicy.java
new file mode 100644
index 0000000000..352a55ed1e
--- /dev/null
+++ 
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/PoolingPolicy.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.allocator;
+
+/**
+ * Define a policy for allocating buffers.
+ */
+public enum PoolingPolicy {
+
+    /**
+     * Allocate memory from JVM heap without any pooling.
+     *
+     * <p>This option has the least overhead in terms of memory usage since the
+     * memory will be automatically reclaimed by the JVM GC but might impose a
+     * performance penalty at high throughput.
+     */
+    UnpooledHeap,
+
+    /**
+     * Use Direct memory for all buffers and pool the memory.
+     *
+     * <p>Direct memory will avoid the overhead of JVM GC and most memory 
copies
+     * when reading and writing to socket channel.
+     *
+     * <p>Pooling will add memory space overhead due to the fact that there 
will be
+     * fragmentation in the allocator and that threads will keep a portion of
+     * memory as thread-local to avoid contention when possible.
+     */
+    PooledDirect
+}
diff --git 
a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderImpl.java
 
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderImpl.java
new file mode 100644
index 0000000000..fc6bd9dc25
--- /dev/null
+++ 
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderImpl.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.allocator.impl;
+
+import io.netty.buffer.ByteBufAllocator;
+
+import java.util.function.Consumer;
+
+import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
+import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
+import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
+import org.apache.bookkeeper.common.allocator.PoolingPolicy;
+
+/**
+ * Implementation of {@link ByteBufAllocatorBuilder}.
+ */
+public class ByteBufAllocatorBuilderImpl implements ByteBufAllocatorBuilder {
+
+    ByteBufAllocator pooledAllocator = null;
+    ByteBufAllocator unpooledAllocator = null;
+    PoolingPolicy poolingPolicy = PoolingPolicy.PooledDirect;
+    int poolingConcurrency = 2 * Runtime.getRuntime().availableProcessors();
+    OutOfMemoryPolicy outOfMemoryPolicy = OutOfMemoryPolicy.FallbackToHeap;
+    Consumer<OutOfMemoryError> outOfMemoryListener = null;
+    LeakDetectionPolicy leakDetectionPolicy = LeakDetectionPolicy.Disabled;
+
+    @Override
+    public ByteBufAllocator build() {
+        return new ByteBufAllocatorImpl(pooledAllocator, unpooledAllocator, 
poolingPolicy, poolingConcurrency,
+                outOfMemoryPolicy, outOfMemoryListener, leakDetectionPolicy);
+    }
+
+    @Override
+    public ByteBufAllocatorBuilder pooledAllocator(ByteBufAllocator 
pooledAllocator) {
+        this.pooledAllocator = pooledAllocator;
+        return this;
+    }
+
+    @Override
+    public ByteBufAllocatorBuilder unpooledAllocator(ByteBufAllocator 
unpooledAllocator) {
+        this.unpooledAllocator = unpooledAllocator;
+        return this;
+    }
+
+    @Override
+    public ByteBufAllocatorBuilder poolingPolicy(PoolingPolicy policy) {
+        this.poolingPolicy = policy;
+        return this;
+    }
+
+    @Override
+    public ByteBufAllocatorBuilder poolingConcurrency(int poolingConcurrency) {
+        this.poolingConcurrency = poolingConcurrency;
+        return this;
+    }
+
+    @Override
+    public ByteBufAllocatorBuilder outOfMemoryPolicy(OutOfMemoryPolicy policy) 
{
+        this.outOfMemoryPolicy = policy;
+        return this;
+    }
+
+    @Override
+    public ByteBufAllocatorBuilder 
outOfMemoryListener(Consumer<OutOfMemoryError> listener) {
+        this.outOfMemoryListener = listener;
+        return this;
+    }
+
+    @Override
+    public ByteBufAllocatorBuilder leakDetectionPolicy(LeakDetectionPolicy 
leakDetectionPolicy) {
+        this.leakDetectionPolicy = leakDetectionPolicy;
+        return this;
+    }
+
+}
diff --git 
a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java
 
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java
new file mode 100644
index 0000000000..35441659a0
--- /dev/null
+++ 
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.allocator.impl;
+
+import io.netty.buffer.AbstractByteBufAllocator;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.util.ResourceLeakDetector;
+import io.netty.util.ResourceLeakDetector.Level;
+import java.util.function.Consumer;
+
+import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
+import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
+import org.apache.bookkeeper.common.allocator.PoolingPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link ByteBufAllocator}.
+ */
+public class ByteBufAllocatorImpl extends AbstractByteBufAllocator implements 
ByteBufAllocator {
+
+    private static final Logger log = 
LoggerFactory.getLogger(ByteBufAllocatorImpl.class);
+
+    private final ByteBufAllocator pooledAllocator;
+    private final ByteBufAllocator unpooledAllocator;
+    private final PoolingPolicy poolingPolicy;
+    private final OutOfMemoryPolicy outOfMemoryPolicy;
+    private final Consumer<OutOfMemoryError> outOfMemoryListener;
+
+    ByteBufAllocatorImpl(ByteBufAllocator pooledAllocator, ByteBufAllocator 
unpooledAllocator,
+            PoolingPolicy poolingPolicy, int poolingConcurrency, 
OutOfMemoryPolicy outOfMemoryPolicy,
+            Consumer<OutOfMemoryError> outOfMemoryListener,
+            LeakDetectionPolicy leakDetectionPolicy) {
+        super(poolingPolicy == PoolingPolicy.PooledDirect /* preferDirect */);
+
+        this.poolingPolicy = poolingPolicy;
+        this.outOfMemoryPolicy = outOfMemoryPolicy;
+        if (outOfMemoryListener == null) {
+            this.outOfMemoryListener = (v) -> {
+                log.error("Unable to allocate memory", v);
+            };
+        } else {
+            this.outOfMemoryListener = outOfMemoryListener;
+        }
+
+        if (poolingPolicy == PoolingPolicy.PooledDirect) {
+            if (pooledAllocator == null) {
+                this.pooledAllocator = new PooledByteBufAllocator(
+                        true /* preferDirect */,
+                        poolingConcurrency /* nHeapArena */,
+                        poolingConcurrency /* nDirectArena */,
+                        PooledByteBufAllocator.defaultPageSize(),
+                        PooledByteBufAllocator.defaultMaxOrder(),
+                        PooledByteBufAllocator.defaultTinyCacheSize(),
+                        PooledByteBufAllocator.defaultSmallCacheSize(),
+                        PooledByteBufAllocator.defaultNormalCacheSize(),
+                        PooledByteBufAllocator.defaultUseCacheForAllThreads());
+            } else {
+                this.pooledAllocator = pooledAllocator;
+            }
+        } else {
+            this.pooledAllocator = null;
+        }
+
+        this.unpooledAllocator = (unpooledAllocator != null) ? 
unpooledAllocator : UnpooledByteBufAllocator.DEFAULT;
+
+        // The setting is static in Netty, so it will actually affect all
+        // allocators
+        switch (leakDetectionPolicy) {
+        case Disabled:
+            if (log.isDebugEnabled()) {
+                log.debug("Disable Netty allocator leak detector");
+            }
+            ResourceLeakDetector.setLevel(Level.DISABLED);
+            break;
+
+        case Simple:
+            log.info("Setting Netty allocator leak detector to Simple");
+            ResourceLeakDetector.setLevel(Level.SIMPLE);
+            break;
+
+        case Advanced:
+            log.info("Setting Netty allocator leak detector to Advanced");
+            ResourceLeakDetector.setLevel(Level.ADVANCED);
+            break;
+
+        case Paranoid:
+            log.info("Setting Netty allocator leak detector to Paranoid");
+            ResourceLeakDetector.setLevel(Level.PARANOID);
+            break;
+        }
+    }
+
+    @Override
+    protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
+        try {
+            // There are few cases in which we ask explicitly for a pooled
+            // heap buffer.
+            ByteBufAllocator alloc = (poolingPolicy == 
PoolingPolicy.PooledDirect) ? pooledAllocator
+                    : unpooledAllocator;
+            return alloc.heapBuffer(initialCapacity, maxCapacity);
+        } catch (OutOfMemoryError e) {
+            outOfMemoryListener.accept(e);
+            throw e;
+        }
+    }
+
+    @Override
+    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
+        if (poolingPolicy == PoolingPolicy.PooledDirect) {
+            try {
+                return pooledAllocator.directBuffer(initialCapacity, 
maxCapacity);
+            } catch (OutOfMemoryError e) {
+                switch (outOfMemoryPolicy) {
+                case ThrowException:
+                    outOfMemoryListener.accept(e);
+                    throw e;
+
+                case FallbackToHeap:
+                    try {
+                        return unpooledAllocator.heapBuffer(initialCapacity, 
maxCapacity);
+                    } catch (OutOfMemoryError e2) {
+                        outOfMemoryListener.accept(e2);
+                        throw e2;
+                    }
+                }
+                return null;
+            }
+        } else {
+            // Unpooled heap buffer. Force heap buffers because unpooled direct
+            // buffers have very high overhead of allocation/reclaiming
+            try {
+                return unpooledAllocator.heapBuffer(initialCapacity, 
maxCapacity);
+            } catch (OutOfMemoryError e) {
+                outOfMemoryListener.accept(e);
+                throw e;
+            }
+        }
+    }
+
+    @Override
+    public boolean isDirectBufferPooled() {
+        return pooledAllocator != null && 
pooledAllocator.isDirectBufferPooled();
+    }
+}
diff --git 
a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/package-info.java
 
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/package-info.java
new file mode 100644
index 0000000000..10133096cc
--- /dev/null
+++ 
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Implements the utilities for allocator used across the project.
+ */
+package org.apache.bookkeeper.common.allocator.impl;
\ No newline at end of file
diff --git 
a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/package-info.java
 
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/package-info.java
new file mode 100644
index 0000000000..512911402d
--- /dev/null
+++ 
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * defines the utilities for allocator used across the project.
+ */
+package org.apache.bookkeeper.common.allocator;
\ No newline at end of file
diff --git 
a/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java
 
b/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java
new file mode 100644
index 0000000000..8ff66c3317
--- /dev/null
+++ 
b/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.allocator.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
+import java.lang.reflect.Constructor;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
+import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
+import org.apache.bookkeeper.common.allocator.PoolingPolicy;
+import org.junit.Test;
+
+/**
+ * Tests for {@link ByteBufAllocatorBuilderImpl}.
+ */
+public class ByteBufAllocatorBuilderTest {
+
+    private static final OutOfMemoryError outOfDirectMemException;
+
+    static {
+        try {
+            Class<?> clazz = (Class<?>) 
ByteBufAllocatorBuilderTest.class.getClassLoader()
+                    
.loadClass("io.netty.util.internal.OutOfDirectMemoryError");
+            @SuppressWarnings("unchecked")
+            Constructor<OutOfMemoryError> constructor = 
(Constructor<OutOfMemoryError>) clazz
+                    .getDeclaredConstructor(String.class);
+
+            constructor.setAccessible(true);
+            outOfDirectMemException = constructor.newInstance("no mem");
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    @Test
+    public void testOomWithException() {
+        ByteBufAllocator baseAlloc = mock(ByteBufAllocator.class);
+        when(baseAlloc.directBuffer(anyInt(), 
anyInt())).thenThrow(outOfDirectMemException);
+
+        AtomicReference<OutOfMemoryError> receivedException = new 
AtomicReference<>();
+
+        ByteBufAllocator alloc = ByteBufAllocatorBuilder.create()
+                .pooledAllocator(baseAlloc)
+                .outOfMemoryPolicy(OutOfMemoryPolicy.ThrowException)
+                .outOfMemoryListener((e) -> {
+                    receivedException.set(e);
+                })
+                .build();
+
+        try {
+            alloc.buffer();
+            fail("Should have thrown exception");
+        } catch (OutOfMemoryError e) {
+            // Expected
+            assertEquals(outOfDirectMemException, e);
+        }
+
+        // Ensure the notification was triggered even when exception is thrown
+        assertEquals(outOfDirectMemException, receivedException.get());
+    }
+
+    @Test
+    public void testOomWithFallback() {
+        ByteBufAllocator baseAlloc = mock(ByteBufAllocator.class);
+        when(baseAlloc.directBuffer(anyInt(), 
anyInt())).thenThrow(outOfDirectMemException);
+
+        AtomicReference<OutOfMemoryError> receivedException = new 
AtomicReference<>();
+
+        ByteBufAllocator alloc = ByteBufAllocatorBuilder.create()
+                .pooledAllocator(baseAlloc)
+                .unpooledAllocator(UnpooledByteBufAllocator.DEFAULT)
+                .outOfMemoryPolicy(OutOfMemoryPolicy.FallbackToHeap)
+                .outOfMemoryListener((e) -> {
+                    receivedException.set(e);
+                })
+                .build();
+
+        // Should not throw exception
+        ByteBuf buf = alloc.buffer();
+        assertEquals(UnpooledByteBufAllocator.DEFAULT, buf.alloc());
+
+        // No notification should have been triggered
+        assertEquals(null, receivedException.get());
+    }
+
+    @Test
+    public void testOomWithFallbackAndNoMoreHeap() {
+        ByteBufAllocator baseAlloc = mock(ByteBufAllocator.class);
+        when(baseAlloc.directBuffer(anyInt(), 
anyInt())).thenThrow(outOfDirectMemException);
+
+        ByteBufAllocator heapAlloc = mock(ByteBufAllocator.class);
+        OutOfMemoryError noHeapError = new OutOfMemoryError("no more heap");
+        when(heapAlloc.heapBuffer(anyInt(), anyInt())).thenThrow(noHeapError);
+
+        AtomicReference<OutOfMemoryError> receivedException = new 
AtomicReference<>();
+
+        ByteBufAllocator alloc = ByteBufAllocatorBuilder.create()
+                .pooledAllocator(baseAlloc)
+                .unpooledAllocator(heapAlloc)
+                .outOfMemoryPolicy(OutOfMemoryPolicy.FallbackToHeap)
+                .outOfMemoryListener((e) -> {
+                    receivedException.set(e);
+                })
+                .build();
+
+        try {
+            alloc.buffer();
+            fail("Should have thrown exception");
+        } catch (OutOfMemoryError e) {
+            // Expected
+            assertEquals(noHeapError, e);
+        }
+
+        // Ensure the notification was triggered even when exception is thrown
+        assertEquals(noHeapError, receivedException.get());
+    }
+
+    @Test
+    public void testOomUnpooled() {
+        ByteBufAllocator heapAlloc = mock(ByteBufAllocator.class);
+        OutOfMemoryError noHeapError = new OutOfMemoryError("no more heap");
+        when(heapAlloc.heapBuffer(anyInt(), anyInt())).thenThrow(noHeapError);
+
+        AtomicReference<OutOfMemoryError> receivedException = new 
AtomicReference<>();
+
+        ByteBufAllocator alloc = ByteBufAllocatorBuilder.create()
+                .poolingPolicy(PoolingPolicy.UnpooledHeap)
+                .unpooledAllocator(heapAlloc)
+                .outOfMemoryPolicy(OutOfMemoryPolicy.FallbackToHeap)
+                .outOfMemoryListener((e) -> {
+                    receivedException.set(e);
+                })
+                .build();
+
+        try {
+            alloc.directBuffer();
+            fail("Should have thrown exception");
+        } catch (OutOfMemoryError e) {
+            // Expected
+            assertEquals(noHeapError, e);
+        }
+
+        // Ensure the notification was triggered even when exception is thrown
+        assertEquals(noHeapError, receivedException.get());
+    }
+
+    @Test
+    public void testOomUnpooledWithHeap() {
+        ByteBufAllocator heapAlloc = mock(ByteBufAllocator.class);
+        OutOfMemoryError noHeapError = new OutOfMemoryError("no more heap");
+        when(heapAlloc.heapBuffer(anyInt(), anyInt())).thenThrow(noHeapError);
+
+        AtomicReference<OutOfMemoryError> receivedException = new 
AtomicReference<>();
+
+        ByteBufAllocator alloc = ByteBufAllocatorBuilder.create()
+                .poolingPolicy(PoolingPolicy.UnpooledHeap)
+                .unpooledAllocator(heapAlloc)
+                .outOfMemoryPolicy(OutOfMemoryPolicy.FallbackToHeap)
+                .outOfMemoryListener((e) -> {
+                    receivedException.set(e);
+                })
+                .build();
+
+        try {
+            alloc.heapBuffer();
+            fail("Should have thrown exception");
+        } catch (OutOfMemoryError e) {
+            // Expected
+            assertEquals(noHeapError, e);
+        }
+
+        // Ensure the notification was triggered even when exception is thrown
+        assertEquals(noHeapError, receivedException.get());
+    }
+
+    @Test
+    public void testUnpooled() {
+        ByteBufAllocator alloc = ByteBufAllocatorBuilder.create()
+                .poolingPolicy(PoolingPolicy.UnpooledHeap)
+                .build();
+
+        ByteBuf buf = alloc.buffer();
+        assertEquals(UnpooledByteBufAllocator.DEFAULT, buf.alloc());
+        assertTrue(buf.hasArray());
+
+        ByteBuf buf2 = alloc.directBuffer();
+        assertEquals(UnpooledByteBufAllocator.DEFAULT, buf2.alloc());
+        assertTrue(buf2.hasArray());
+    }
+
+    @Test
+    public void testPooled() {
+        PooledByteBufAllocator pooledAlloc = new PooledByteBufAllocator(true);
+
+        ByteBufAllocator alloc = ByteBufAllocatorBuilder.create()
+                .poolingPolicy(PoolingPolicy.PooledDirect)
+                .pooledAllocator(pooledAlloc)
+                .build();
+
+        assertTrue(alloc.isDirectBufferPooled());
+
+        ByteBuf buf1 = alloc.buffer();
+        assertEquals(pooledAlloc, buf1.alloc());
+        assertFalse(buf1.hasArray());
+        buf1.release();
+
+        ByteBuf buf2 = alloc.directBuffer();
+        assertEquals(pooledAlloc, buf2.alloc());
+        assertFalse(buf2.hasArray());
+        buf2.release();
+
+        ByteBuf buf3 = alloc.heapBuffer();
+        assertEquals(pooledAlloc, buf3.alloc());
+        assertTrue(buf3.hasArray());
+        buf3.release();
+    }
+
+    @Test
+    public void testPooledWithDefaultAllocator() {
+        ByteBufAllocator alloc = ByteBufAllocatorBuilder.create()
+                .poolingPolicy(PoolingPolicy.PooledDirect)
+                .poolingConcurrency(3)
+                .build();
+
+        assertTrue(alloc.isDirectBufferPooled());
+
+        ByteBuf buf1 = alloc.buffer();
+        assertEquals(PooledByteBufAllocator.class, buf1.alloc().getClass());
+        assertEquals(3, ((PooledByteBufAllocator) 
buf1.alloc()).metric().numDirectArenas());
+        assertFalse(buf1.hasArray());
+        buf1.release();
+
+        ByteBuf buf2 = alloc.directBuffer();
+        assertFalse(buf2.hasArray());
+        buf2.release();
+
+        ByteBuf buf3 = alloc.heapBuffer();
+        assertTrue(buf3.hasArray());
+        buf3.release();
+    }
+}
diff --git a/pom.xml b/pom.xml
index 4d210a34af..c13d4878ce 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,6 +54,7 @@
     <module>buildtools</module>
     <module>circe-checksum</module>
     <module>bookkeeper-common</module>
+    <module>bookkeeper-common-allocator</module>
     <module>bookkeeper-stats</module>
     <module>bookkeeper-proto</module>
     <module>bookkeeper-server</module>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to