[
https://issues.apache.org/jira/browse/HADOOP-18105?focusedWorklogId=774231&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-774231
]
ASF GitHub Bot logged work on HADOOP-18105:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 24/May/22 21:10
Start Date: 24/May/22 21:10
Worklog Time Spent: 10m
Work Description: steveloughran commented on code in PR #4263:
URL: https://github.com/apache/hadoop/pull/4263#discussion_r880947333
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/WeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+
+/**
+ * Buffer pool implementation which uses weak references to store
+ * buffers in the pool, such that they are garbage collected when
+ * there are no references to the buffer during a gc run. This is
+ * important as direct buffers don't get garbage collected automatically
+ * during a gc run as they are not stored on heap memory.
+ * Also the buffers are stored in a tree map which helps in returning
+ * smallest buffer whose size is just greater than requested length.
+ * This is a thread safe implementation.
+ */
+public final class WeakReferencedElasticByteBufferPool extends
ElasticByteBufferPool {
+
+ private final TreeMap<Key, WeakReference<ByteBuffer>> directBuffers =
Review Comment:
1. add javadocs here and below, mention use must be in synchronized blocks
2. field should be of type Map<>, unless it has to be explicitly a tree map
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ByteBufferPool.java:
##########
@@ -45,4 +45,6 @@ public interface ByteBufferPool {
* @param buffer a direct bytebuffer
*/
void putBuffer(ByteBuffer buffer);
+
+ default void release() { }
Review Comment:
javadoc?
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/WeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+
+/**
+ * Buffer pool implementation which uses weak references to store
+ * buffers in the pool, such that they are garbage collected when
+ * there are no references to the buffer during a gc run. This is
+ * important as direct buffer don't get garbage collected automatically
+ * during a gc run as they are not stored on heap memory.
+ * Also the buffers are stored in a tree map which helps in returning
+ * smallest buffer whose size is just greater than requested length.
+ * This is a thread safe implementation.
+ */
+public final class WeakReferencedElasticByteBufferPool extends
ElasticByteBufferPool {
+
+ private final TreeMap<Key, WeakReference<ByteBuffer>> directBuffers =
+ new TreeMap<>();
+
+ private final TreeMap<Key, WeakReference<ByteBuffer>> heapBuffers =
+ new TreeMap<>();
+
+ private TreeMap<Key, WeakReference<ByteBuffer>> getBufferTree(boolean
isDirect) {
+ return isDirect ? directBuffers : heapBuffers;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @param direct whether we want a direct byte buffer or a heap one.
+ * @param length length of requested buffer.
+ * @return returns equal or next greater than capacity buffer from
+ * pool if already available and not garbage collected else creates
+ * a new buffer and return it.
+ */
+ @Override
+ public synchronized ByteBuffer getBuffer(boolean direct, int length) {
+ TreeMap<Key, WeakReference<ByteBuffer>> buffersTree =
getBufferTree(direct);
+
+ // Scan the entire tree and remove all weak null references.
+ buffersTree.entrySet().removeIf(next -> next.getValue().get() == null);
+
+ Map.Entry<Key, WeakReference<ByteBuffer>> entry =
+ buffersTree.ceilingEntry(new Key(length, 0));
+ // If there is no buffer present in the pool with desired size.
+ if (entry == null) {
+ return direct ? ByteBuffer.allocateDirect(length) :
+ ByteBuffer.allocate(length);
+ }
+ // buffer is available in the pool and not garbage collected.
+ WeakReference<ByteBuffer> bufferInPool = entry.getValue();
+ buffersTree.remove(entry.getKey());
+ ByteBuffer buffer = bufferInPool.get();
+ if (buffer != null) {
+ return buffer;
+ }
+ // buffer was in pool but already got garbage collected.
+ return direct ? ByteBuffer.allocateDirect(length) :
Review Comment:
sorry, i meant
```java
return direct
? ByteBuffer.allocateDirect(length)
: ByteBuffer.allocate(length);
```
put the ? and : first to maximise visibility
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/WeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+
+/**
+ * Buffer pool implementation which uses weak references to store
+ * buffers in the pool, such that they are garbage collected when
+ * there are no references to the buffer during a gc run. This is
+ * important as direct buffers don't get garbage collected automatically
+ * during a gc run as they are not stored on heap memory.
+ * Also the buffers are stored in a tree map which helps in returning
+ * smallest buffer whose size is just greater than requested length.
+ * This is a thread safe implementation.
+ */
+public final class WeakReferencedElasticByteBufferPool extends
ElasticByteBufferPool {
Review Comment:
tag ag as @Private @Unstable
##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMoreWeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Non parameterized tests for {@code WeakReferencedElasticByteBufferPool}.
+ */
+public class TestMoreWeakReferencedElasticByteBufferPool
Review Comment:
add a test for
* returning a buffer which isn't in the pool
* null buffer (expecting NPE somewhere, presumaby)
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/WeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+
+/**
+ * Buffer pool implementation which uses weak references to store
+ * buffers in the pool, such that they are garbage collected when
+ * there are no references to the buffer during a gc run. This is
+ * important as direct buffers don't get garbage collected automatically
+ * during a gc run as they are not stored on heap memory.
+ * Also the buffers are stored in a tree map which helps in returning
+ * smallest buffer whose size is just greater than requested length.
+ * This is a thread safe implementation.
+ */
+public final class WeakReferencedElasticByteBufferPool extends
ElasticByteBufferPool {
+
+ private final TreeMap<Key, WeakReference<ByteBuffer>> directBuffers =
+ new TreeMap<>();
+
+ private final TreeMap<Key, WeakReference<ByteBuffer>> heapBuffers =
+ new TreeMap<>();
+
+ private TreeMap<Key, WeakReference<ByteBuffer>> getBufferTree(boolean
isDirect) {
+ return isDirect
+ ? directBuffers
+ : heapBuffers;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @param direct whether we want a direct byte buffer or a heap one.
+ * @param length length of requested buffer.
+ * @return returns equal or next greater than capacity buffer from
+ * pool if already available and not garbage collected else creates
+ * a new buffer and return it.
+ */
+ @Override
+ public synchronized ByteBuffer getBuffer(boolean direct, int length) {
+ TreeMap<Key, WeakReference<ByteBuffer>> buffersTree =
getBufferTree(direct);
+
+ // Scan the entire tree and remove all weak null references.
+ buffersTree.entrySet().removeIf(next -> next.getValue().get() == null);
+
+ Map.Entry<Key, WeakReference<ByteBuffer>> entry =
+ buffersTree.ceilingEntry(new Key(length, 0));
+ // If there is no buffer present in the pool with desired size.
+ if (entry == null) {
+ return direct ? ByteBuffer.allocateDirect(length) :
+ ByteBuffer.allocate(length);
+ }
+ // buffer is available in the pool and not garbage collected.
+ WeakReference<ByteBuffer> bufferInPool = entry.getValue();
+ buffersTree.remove(entry.getKey());
+ ByteBuffer buffer = bufferInPool.get();
+ if (buffer != null) {
+ return buffer;
+ }
+ // buffer was in pool but already got garbage collected.
+ return direct ? ByteBuffer.allocateDirect(length) :
+ ByteBuffer.allocate(length);
+ }
+
+ /**
+ * Return buffer to the pool.
+ * @param buffer buffer to be returned.
+ */
+ @Override
+ public synchronized void putBuffer(ByteBuffer buffer) {
+ buffer.clear();
+ TreeMap<Key, WeakReference<ByteBuffer>> buffersTree =
getBufferTree(buffer.isDirect());
+ // Buffers are indexed by (capacity, time).
+ // If our key is not unique on the first try, we try again, since the
+ // time will be different. Since we use nanoseconds, it's pretty
+ // unlikely that we'll loop even once, unless the system clock has a
+ // poor granularity or multi-socket systems have clocks slightly out
+ // of sync.
+ while (true) {
+ Key keyToInsert = new Key(buffer.capacity(), System.nanoTime());
+ if (!buffersTree.containsKey(keyToInsert)) {
+ buffersTree.put(keyToInsert, new WeakReference<>(buffer));
+ return;
+ }
+ }
+ }
+
+ /**
+ * Clear the buffer pool thus releasing all the buffers.
+ * The caller must remove all references of
+ * existing buffers before calling this method to avoid
+ * memory leaks.
+ */
+ @Override
+ public synchronized void release() {
+ heapBuffers.clear();
+ directBuffers.clear();
+ }
+
+ @VisibleForTesting
+ public int getCurrentBuffersCount(boolean isDirect) {
+ return isDirect ? directBuffers.size() : heapBuffers.size();
Review Comment:
* same nit about splitting at ? : boundaries
* does this need to be synchronized?
* javadocs
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/WeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+
+/**
+ * Buffer pool implementation which uses weak references to store
+ * buffers in the pool, such that they are garbage collected when
+ * there are no references to the buffer during a gc run. This is
+ * important as direct buffers don't get garbage collected automatically
+ * during a gc run as they are not stored on heap memory.
+ * Also the buffers are stored in a tree map which helps in returning
+ * smallest buffer whose size is just greater than requested length.
+ * This is a thread safe implementation.
+ */
+public final class WeakReferencedElasticByteBufferPool extends
ElasticByteBufferPool {
+
+ private final TreeMap<Key, WeakReference<ByteBuffer>> directBuffers =
+ new TreeMap<>();
+
+ private final TreeMap<Key, WeakReference<ByteBuffer>> heapBuffers =
+ new TreeMap<>();
+
+ private TreeMap<Key, WeakReference<ByteBuffer>> getBufferTree(boolean
isDirect) {
+ return isDirect
+ ? directBuffers
+ : heapBuffers;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @param direct whether we want a direct byte buffer or a heap one.
+ * @param length length of requested buffer.
+ * @return returns equal or next greater than capacity buffer from
+ * pool if already available and not garbage collected else creates
+ * a new buffer and return it.
+ */
+ @Override
+ public synchronized ByteBuffer getBuffer(boolean direct, int length) {
+ TreeMap<Key, WeakReference<ByteBuffer>> buffersTree =
getBufferTree(direct);
+
+ // Scan the entire tree and remove all weak null references.
+ buffersTree.entrySet().removeIf(next -> next.getValue().get() == null);
+
+ Map.Entry<Key, WeakReference<ByteBuffer>> entry =
+ buffersTree.ceilingEntry(new Key(length, 0));
+ // If there is no buffer present in the pool with desired size.
+ if (entry == null) {
+ return direct ? ByteBuffer.allocateDirect(length) :
+ ByteBuffer.allocate(length);
+ }
+ // buffer is available in the pool and not garbage collected.
+ WeakReference<ByteBuffer> bufferInPool = entry.getValue();
+ buffersTree.remove(entry.getKey());
+ ByteBuffer buffer = bufferInPool.get();
+ if (buffer != null) {
+ return buffer;
+ }
+ // buffer was in pool but already got garbage collected.
+ return direct ? ByteBuffer.allocateDirect(length) :
+ ByteBuffer.allocate(length);
+ }
+
+ /**
+ * Return buffer to the pool.
+ * @param buffer buffer to be returned.
+ */
+ @Override
+ public synchronized void putBuffer(ByteBuffer buffer) {
+ buffer.clear();
+ TreeMap<Key, WeakReference<ByteBuffer>> buffersTree =
getBufferTree(buffer.isDirect());
+ // Buffers are indexed by (capacity, time).
+ // If our key is not unique on the first try, we try again, since the
+ // time will be different. Since we use nanoseconds, it's pretty
+ // unlikely that we'll loop even once, unless the system clock has a
+ // poor granularity or multi-socket systems have clocks slightly out
+ // of sync.
+ while (true) {
+ Key keyToInsert = new Key(buffer.capacity(), System.nanoTime());
+ if (!buffersTree.containsKey(keyToInsert)) {
+ buffersTree.put(keyToInsert, new WeakReference<>(buffer));
+ return;
+ }
+ }
Review Comment:
what if the buffer wasn't foiund. log at warning? LogExactlyOnce, maybe?
Issue Time Tracking
-------------------
Worklog Id: (was: 774231)
Time Spent: 2.5h (was: 2h 20m)
> Implement a variant of ElasticByteBufferPool which uses weak references for
> garbage collection.
> -----------------------------------------------------------------------------------------------
>
> Key: HADOOP-18105
> URL: https://issues.apache.org/jira/browse/HADOOP-18105
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: common, fs
> Reporter: Mukund Thakur
> Assignee: Mukund Thakur
> Priority: Major
> Labels: pull-request-available
> Time Spent: 2.5h
> Remaining Estimate: 0h
>
> Currently in hadoop codebase, we have two classes which implements byte
> buffers pooling.
> One is ElasticByteBufferPool which doesn't use weak references and thus could
> cause memory leaks in production environment.
> Other is DirectBufferPool which uses weak references but doesn't support
> caller's preference for either on-heap or off-heap buffers.
>
> The idea is to create an improved version of ElasticByteBufferPool by
> subclassing it ( as it is marked as public and stable and used widely in hdfs
> ) with essential functionalities required for effective buffer pooling. This
> is important for the parent Vectored IO work.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]