[ 
https://issues.apache.org/jira/browse/HADOOP-18105?focusedWorklogId=768972&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-768972
 ]

ASF GitHub Bot logged work on HADOOP-18105:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/May/22 10:03
            Start Date: 11/May/22 10:03
    Worklog Time Spent: 10m 
      Work Description: steveloughran commented on code in PR #4263:
URL: https://github.com/apache/hadoop/pull/4263#discussion_r870105560


##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/WeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+
+/**
+ * Buffer pool implementation which uses weak references to store
+ * buffers in the pool, such that they are garbage collected when
+ * there are no references to the buffer during a gc run. This is
+ * important as direct buffer don't get garbage collected automatically

Review Comment:
   "direct buffers"



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/WeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+
+/**
+ * Buffer pool implementation which uses weak references to store
+ * buffers in the pool, such that they are garbage collected when
+ * there are no references to the buffer during a gc run. This is
+ * important as direct buffer don't get garbage collected automatically
+ * during a gc run as they are not stored on heap memory.
+ * Also the buffers are stored in a tree map which helps in returning
+ * smallest buffer whose size is just greater than requested length.
+ * This is a thread safe implementation.
+ */
+public final class WeakReferencedElasticByteBufferPool extends 
ElasticByteBufferPool {
+
+  private final TreeMap<Key, WeakReference<ByteBuffer>> directBuffers =
+          new TreeMap<>();
+
+  private final TreeMap<Key, WeakReference<ByteBuffer>> heapBuffers =
+          new TreeMap<>();
+
+  private TreeMap<Key, WeakReference<ByteBuffer>> getBufferTree(boolean 
isDirect) {
+    return isDirect ? directBuffers : heapBuffers;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param direct whether we want a direct byte buffer or a heap one.
+   * @param length length of requested buffer.
+   * @return returns equal or next greater than capacity buffer from
+   * pool if already available and not garbage collected else creates
+   * a new buffer and return it.
+   */
+  @Override
+  public synchronized ByteBuffer getBuffer(boolean direct, int length) {
+    TreeMap<Key, WeakReference<ByteBuffer>> buffersTree = 
getBufferTree(direct);
+
+    // Scan the entire tree and remove all weak null references.
+    buffersTree.entrySet().removeIf(next -> next.getValue().get() == null);
+
+    Map.Entry<Key, WeakReference<ByteBuffer>> entry =
+            buffersTree.ceilingEntry(new Key(length, 0));
+    // If there is no buffer present in the pool with desired size.
+    if (entry == null) {
+      return direct ? ByteBuffer.allocateDirect(length) :
+                      ByteBuffer.allocate(length);
+    }
+    // buffer is available in the pool and not garbage collected.
+    WeakReference<ByteBuffer> bufferInPool = entry.getValue();
+    buffersTree.remove(entry.getKey());
+    ByteBuffer buffer = bufferInPool.get();
+    if (buffer != null) {
+      return buffer;
+    }
+    // buffer was in pool but already got garbage collected.
+    return direct ? ByteBuffer.allocateDirect(length) :
+            ByteBuffer.allocate(length);
+  }
+
+  /**
+   * Return buffer to the pool.
+   * @param buffer buffer to be returned.
+   */
+  @Override
+  public synchronized void putBuffer(ByteBuffer buffer) {
+    buffer.clear();
+    TreeMap<Key, WeakReference<ByteBuffer>> buffersTree = 
getBufferTree(buffer.isDirect());
+    // Buffers are indexed by (capacity, time).
+    // If our key is not unique on the first try, we try again, since the
+    // time will be different.  Since we use nanoseconds, it's pretty
+    // unlikely that we'll loop even once, unless the system clock has a
+    // poor granularity.
+    while (true) {
+      Key keyToInsert = new Key(buffer.capacity(), System.nanoTime());
+      if (!buffersTree.containsKey(keyToInsert)) {
+        buffersTree.put(keyToInsert, new WeakReference<>(buffer));
+        return;
+      }
+    }
+  }
+
+  /**
+   * Clear the buffer pool thus releasing all the buffers.
+   * The caller must remove all references of
+   * existing buffers before calling this method to avoid
+   * memory leaks.
+   */
+  @Override
+  public void release() {

Review Comment:
   synchronized?



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/WeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+
+/**
+ * Buffer pool implementation which uses weak references to store
+ * buffers in the pool, such that they are garbage collected when
+ * there are no references to the buffer during a gc run. This is
+ * important as direct buffer don't get garbage collected automatically
+ * during a gc run as they are not stored on heap memory.
+ * Also the buffers are stored in a tree map which helps in returning
+ * smallest buffer whose size is just greater than requested length.
+ * This is a thread safe implementation.
+ */
+public final class WeakReferencedElasticByteBufferPool extends 
ElasticByteBufferPool {
+
+  private final TreeMap<Key, WeakReference<ByteBuffer>> directBuffers =
+          new TreeMap<>();
+
+  private final TreeMap<Key, WeakReference<ByteBuffer>> heapBuffers =
+          new TreeMap<>();
+
+  private TreeMap<Key, WeakReference<ByteBuffer>> getBufferTree(boolean 
isDirect) {
+    return isDirect ? directBuffers : heapBuffers;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param direct whether we want a direct byte buffer or a heap one.
+   * @param length length of requested buffer.
+   * @return returns equal or next greater than capacity buffer from
+   * pool if already available and not garbage collected else creates
+   * a new buffer and return it.
+   */
+  @Override
+  public synchronized ByteBuffer getBuffer(boolean direct, int length) {
+    TreeMap<Key, WeakReference<ByteBuffer>> buffersTree = 
getBufferTree(direct);
+
+    // Scan the entire tree and remove all weak null references.
+    buffersTree.entrySet().removeIf(next -> next.getValue().get() == null);
+
+    Map.Entry<Key, WeakReference<ByteBuffer>> entry =
+            buffersTree.ceilingEntry(new Key(length, 0));
+    // If there is no buffer present in the pool with desired size.
+    if (entry == null) {
+      return direct ? ByteBuffer.allocateDirect(length) :
+                      ByteBuffer.allocate(length);
+    }
+    // buffer is available in the pool and not garbage collected.
+    WeakReference<ByteBuffer> bufferInPool = entry.getValue();
+    buffersTree.remove(entry.getKey());
+    ByteBuffer buffer = bufferInPool.get();
+    if (buffer != null) {
+      return buffer;
+    }
+    // buffer was in pool but already got garbage collected.
+    return direct ? ByteBuffer.allocateDirect(length) :

Review Comment:
   nit, can you put the ? and : clauses on different lines to make them more 
visible.



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/WeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+
+/**
+ * Buffer pool implementation which uses weak references to store
+ * buffers in the pool, such that they are garbage collected when
+ * there are no references to the buffer during a gc run. This is
+ * important as direct buffer don't get garbage collected automatically
+ * during a gc run as they are not stored on heap memory.
+ * Also the buffers are stored in a tree map which helps in returning
+ * smallest buffer whose size is just greater than requested length.
+ * This is a thread safe implementation.
+ */
+public final class WeakReferencedElasticByteBufferPool extends 
ElasticByteBufferPool {
+
+  private final TreeMap<Key, WeakReference<ByteBuffer>> directBuffers =
+          new TreeMap<>();
+
+  private final TreeMap<Key, WeakReference<ByteBuffer>> heapBuffers =
+          new TreeMap<>();
+
+  private TreeMap<Key, WeakReference<ByteBuffer>> getBufferTree(boolean 
isDirect) {
+    return isDirect ? directBuffers : heapBuffers;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param direct whether we want a direct byte buffer or a heap one.
+   * @param length length of requested buffer.
+   * @return returns equal or next greater than capacity buffer from
+   * pool if already available and not garbage collected else creates
+   * a new buffer and return it.
+   */
+  @Override
+  public synchronized ByteBuffer getBuffer(boolean direct, int length) {
+    TreeMap<Key, WeakReference<ByteBuffer>> buffersTree = 
getBufferTree(direct);
+
+    // Scan the entire tree and remove all weak null references.
+    buffersTree.entrySet().removeIf(next -> next.getValue().get() == null);
+
+    Map.Entry<Key, WeakReference<ByteBuffer>> entry =
+            buffersTree.ceilingEntry(new Key(length, 0));
+    // If there is no buffer present in the pool with desired size.
+    if (entry == null) {
+      return direct ? ByteBuffer.allocateDirect(length) :
+                      ByteBuffer.allocate(length);
+    }
+    // buffer is available in the pool and not garbage collected.
+    WeakReference<ByteBuffer> bufferInPool = entry.getValue();
+    buffersTree.remove(entry.getKey());
+    ByteBuffer buffer = bufferInPool.get();
+    if (buffer != null) {
+      return buffer;
+    }
+    // buffer was in pool but already got garbage collected.
+    return direct ? ByteBuffer.allocateDirect(length) :
+            ByteBuffer.allocate(length);
+  }
+
+  /**
+   * Return buffer to the pool.
+   * @param buffer buffer to be returned.
+   */
+  @Override
+  public synchronized void putBuffer(ByteBuffer buffer) {
+    buffer.clear();
+    TreeMap<Key, WeakReference<ByteBuffer>> buffersTree = 
getBufferTree(buffer.isDirect());
+    // Buffers are indexed by (capacity, time).
+    // If our key is not unique on the first try, we try again, since the
+    // time will be different.  Since we use nanoseconds, it's pretty
+    // unlikely that we'll loop even once, unless the system clock has a
+    // poor granularity.

Review Comment:
   or multisocket systems have clocks slightly out of sync.



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMoreWeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.nio.ByteBuffer;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+/**
+ * Non parameterized tests for {@code WeakReferencedElasticByteBufferPool}.
+ */
+public class TestMoreWeakReferencedElasticByteBufferPool {
+
+  @Test
+  public void testMixedBuffersInPool() {
+    WeakReferencedElasticByteBufferPool pool = new 
WeakReferencedElasticByteBufferPool();
+    ByteBuffer buffer1 = pool.getBuffer(true, 5);
+    ByteBuffer buffer2 = pool.getBuffer(true, 10);
+    ByteBuffer buffer3 = pool.getBuffer(false, 5);
+    ByteBuffer buffer4 = pool.getBuffer(false, 10);
+    ByteBuffer buffer5 = pool.getBuffer(true, 15);
+
+    assertBufferCounts(pool, 0, 0);
+    pool.putBuffer(buffer1);
+    pool.putBuffer(buffer2);
+    assertBufferCounts(pool, 2, 0);
+    pool.putBuffer(buffer3);
+    assertBufferCounts(pool, 2, 1);
+    pool.putBuffer(buffer5);
+    assertBufferCounts(pool, 3, 1);
+    pool.putBuffer(buffer4);
+    assertBufferCounts(pool, 3, 2);
+    pool.release();
+    assertBufferCounts(pool, 0, 0);
+
+  }
+
+  private void assertBufferCounts(WeakReferencedElasticByteBufferPool pool,

Review Comment:
   can you add a javadoc?



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestWeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Unit tests for {@code WeakReferencedElasticByteBufferPool}.
+ */
+@RunWith(Parameterized.class)
+public class TestWeakReferencedElasticByteBufferPool {
+
+  private final boolean isDirect;
+
+  private final String type;
+
+  @Parameterized.Parameters(name = "Buffer type : {0}")
+  public static List<String> params() {
+    return Arrays.asList("direct", "array");
+  }
+
+  public TestWeakReferencedElasticByteBufferPool(String type) {
+    this.type = type;
+    this.isDirect = !"array".equals(type);
+  }
+
+  // Add more tests for different time and same size buffers in the pool. 
+  @Test
+  public void testGetAndPutBasic() {
+    WeakReferencedElasticByteBufferPool pool = new 
WeakReferencedElasticByteBufferPool();
+    int bufferSize = 5;
+    ByteBuffer buffer = pool.getBuffer(isDirect, bufferSize);
+    Assertions.assertThat(buffer.isDirect())
+            .describedAs("Buffered returned should be of correct type {}", 
type)
+            .isEqualTo(isDirect);
+    Assertions.assertThat(buffer.capacity())
+            .describedAs("Initial capacity of returned buffer from pool")
+            .isEqualTo(bufferSize);
+    Assertions.assertThat(buffer.position())
+            .describedAs("Initial position of returned buffer from pool")
+            .isEqualTo(0);
+
+    byte[] arr = createByteArray(bufferSize);
+    buffer.put(arr, 0, arr.length);
+    buffer.flip();
+    validateBufferContent(buffer, arr);
+    Assertions.assertThat(buffer.position())
+            .describedAs("Buffer's position after filling bytes in it")
+            .isEqualTo(bufferSize);
+    // releasing buffer to the pool.
+    pool.putBuffer(buffer);
+    Assertions.assertThat(buffer.position())
+            .describedAs("Position should be reset to 0 after returning buffer 
to the pool")
+            .isEqualTo(0);
+
+  }
+
+  @Test
+  public void testPoolingWithDifferentSizes() {
+    WeakReferencedElasticByteBufferPool pool = new 
WeakReferencedElasticByteBufferPool();
+    ByteBuffer buffer = pool.getBuffer(isDirect, 5);
+    ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer2 = pool.getBuffer(isDirect, 15);
+
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(0);
+
+    pool.putBuffer(buffer1);
+    pool.putBuffer(buffer2);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(2);
+    ByteBuffer buffer3 = pool.getBuffer(isDirect, 12);
+    Assertions.assertThat(buffer3.capacity())
+            .describedAs("Pooled buffer should have older capacity")
+            .isEqualTo(15);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(1);
+    pool.putBuffer(buffer);
+    ByteBuffer buffer4 = pool.getBuffer(isDirect, 6);
+    Assertions.assertThat(buffer4.capacity())
+            .describedAs("Pooled buffer should have older capacity")
+            .isEqualTo(10);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(1);
+
+    pool.release();
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool post release")
+            .isEqualTo(0);
+  }
+
+  @Test
+  public void testPoolingWithDifferentInsertionTime() {
+    WeakReferencedElasticByteBufferPool pool = new 
WeakReferencedElasticByteBufferPool();
+    ByteBuffer buffer = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer2 = pool.getBuffer(isDirect, 10);
+
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(0);
+
+    pool.putBuffer(buffer1);
+    pool.putBuffer(buffer2);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(2);
+    ByteBuffer buffer3 = pool.getBuffer(isDirect, 10);
+    // As buffer1 is returned to the pool before buffer2, it should
+    // be returned when buffer of same size is asked again from
+    // the pool.
+    Assertions.assertThat(buffer3 == buffer1)
+            .describedAs("Buffers should be returned in order of their " +
+                    "insertion time")
+            .isTrue();
+    pool.putBuffer(buffer);
+    ByteBuffer buffer4 = pool.getBuffer(isDirect, 10);
+    Assertions.assertThat(buffer4 == buffer2)

Review Comment:
   use equality test



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestWeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Unit tests for {@code WeakReferencedElasticByteBufferPool}.
+ */
+@RunWith(Parameterized.class)
+public class TestWeakReferencedElasticByteBufferPool {
+
+  private final boolean isDirect;
+
+  private final String type;
+
+  @Parameterized.Parameters(name = "Buffer type : {0}")
+  public static List<String> params() {
+    return Arrays.asList("direct", "array");
+  }
+
+  public TestWeakReferencedElasticByteBufferPool(String type) {
+    this.type = type;
+    this.isDirect = !"array".equals(type);
+  }
+
+  // Add more tests for different time and same size buffers in the pool. 
+  @Test
+  public void testGetAndPutBasic() {
+    WeakReferencedElasticByteBufferPool pool = new 
WeakReferencedElasticByteBufferPool();
+    int bufferSize = 5;
+    ByteBuffer buffer = pool.getBuffer(isDirect, bufferSize);
+    Assertions.assertThat(buffer.isDirect())
+            .describedAs("Buffered returned should be of correct type {}", 
type)
+            .isEqualTo(isDirect);
+    Assertions.assertThat(buffer.capacity())
+            .describedAs("Initial capacity of returned buffer from pool")
+            .isEqualTo(bufferSize);
+    Assertions.assertThat(buffer.position())
+            .describedAs("Initial position of returned buffer from pool")
+            .isEqualTo(0);
+
+    byte[] arr = createByteArray(bufferSize);
+    buffer.put(arr, 0, arr.length);
+    buffer.flip();
+    validateBufferContent(buffer, arr);
+    Assertions.assertThat(buffer.position())
+            .describedAs("Buffer's position after filling bytes in it")
+            .isEqualTo(bufferSize);
+    // releasing buffer to the pool.
+    pool.putBuffer(buffer);
+    Assertions.assertThat(buffer.position())
+            .describedAs("Position should be reset to 0 after returning buffer 
to the pool")
+            .isEqualTo(0);
+
+  }
+
+  @Test
+  public void testPoolingWithDifferentSizes() {
+    WeakReferencedElasticByteBufferPool pool = new 
WeakReferencedElasticByteBufferPool();
+    ByteBuffer buffer = pool.getBuffer(isDirect, 5);
+    ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer2 = pool.getBuffer(isDirect, 15);
+
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(0);
+
+    pool.putBuffer(buffer1);
+    pool.putBuffer(buffer2);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(2);
+    ByteBuffer buffer3 = pool.getBuffer(isDirect, 12);
+    Assertions.assertThat(buffer3.capacity())
+            .describedAs("Pooled buffer should have older capacity")
+            .isEqualTo(15);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(1);
+    pool.putBuffer(buffer);
+    ByteBuffer buffer4 = pool.getBuffer(isDirect, 6);
+    Assertions.assertThat(buffer4.capacity())
+            .describedAs("Pooled buffer should have older capacity")
+            .isEqualTo(10);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(1);
+
+    pool.release();
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool post release")
+            .isEqualTo(0);
+  }
+
+  @Test
+  public void testPoolingWithDifferentInsertionTime() {
+    WeakReferencedElasticByteBufferPool pool = new 
WeakReferencedElasticByteBufferPool();
+    ByteBuffer buffer = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer2 = pool.getBuffer(isDirect, 10);
+
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(0);
+
+    pool.putBuffer(buffer1);
+    pool.putBuffer(buffer2);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(2);
+    ByteBuffer buffer3 = pool.getBuffer(isDirect, 10);
+    // As buffer1 is returned to the pool before buffer2, it should
+    // be returned when buffer of same size is asked again from
+    // the pool.
+    Assertions.assertThat(buffer3 == buffer1)
+            .describedAs("Buffers should be returned in order of their " +
+                    "insertion time")
+            .isTrue();
+    pool.putBuffer(buffer);
+    ByteBuffer buffer4 = pool.getBuffer(isDirect, 10);
+    Assertions.assertThat(buffer4 == buffer2)
+            .describedAs("Buffers should be returned in order of their " +
+                    "insertion time")
+            .isTrue();
+  }
+
+  @Test
+  public void testGarbageCollection() {
+    WeakReferencedElasticByteBufferPool pool = new 
WeakReferencedElasticByteBufferPool();
+    ByteBuffer buffer = pool.getBuffer(isDirect, 5);
+    ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer2 = pool.getBuffer(isDirect, 15);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(0);
+    pool.putBuffer(buffer1);
+    pool.putBuffer(buffer2);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(2);
+    // Before GC.
+    ByteBuffer buffer4 = pool.getBuffer(isDirect, 12);
+    Assertions.assertThat(buffer4.capacity())
+            .describedAs("Pooled buffer should have older capacity")
+            .isEqualTo(15);
+
+    // Removing the references
+    buffer1 = null;
+    buffer2 = null;
+    System.gc();

Review Comment:
   this doesn't reliably trigger a GC; my weak ref test had to create lots of 
memory use before it worked.



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestWeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Unit tests for {@code WeakReferencedElasticByteBufferPool}.
+ */
+@RunWith(Parameterized.class)
+public class TestWeakReferencedElasticByteBufferPool {
+
+  private final boolean isDirect;
+
+  private final String type;
+
+  @Parameterized.Parameters(name = "Buffer type : {0}")
+  public static List<String> params() {
+    return Arrays.asList("direct", "array");
+  }
+
+  public TestWeakReferencedElasticByteBufferPool(String type) {
+    this.type = type;
+    this.isDirect = !"array".equals(type);
+  }
+
+  // Add more tests for different time and same size buffers in the pool. 
+  @Test
+  public void testGetAndPutBasic() {
+    WeakReferencedElasticByteBufferPool pool = new 
WeakReferencedElasticByteBufferPool();
+    int bufferSize = 5;
+    ByteBuffer buffer = pool.getBuffer(isDirect, bufferSize);
+    Assertions.assertThat(buffer.isDirect())
+            .describedAs("Buffered returned should be of correct type {}", 
type)
+            .isEqualTo(isDirect);
+    Assertions.assertThat(buffer.capacity())
+            .describedAs("Initial capacity of returned buffer from pool")
+            .isEqualTo(bufferSize);
+    Assertions.assertThat(buffer.position())
+            .describedAs("Initial position of returned buffer from pool")
+            .isEqualTo(0);
+
+    byte[] arr = createByteArray(bufferSize);
+    buffer.put(arr, 0, arr.length);
+    buffer.flip();
+    validateBufferContent(buffer, arr);
+    Assertions.assertThat(buffer.position())
+            .describedAs("Buffer's position after filling bytes in it")
+            .isEqualTo(bufferSize);
+    // releasing buffer to the pool.
+    pool.putBuffer(buffer);
+    Assertions.assertThat(buffer.position())
+            .describedAs("Position should be reset to 0 after returning buffer 
to the pool")
+            .isEqualTo(0);
+
+  }
+
+  @Test
+  public void testPoolingWithDifferentSizes() {
+    WeakReferencedElasticByteBufferPool pool = new 
WeakReferencedElasticByteBufferPool();
+    ByteBuffer buffer = pool.getBuffer(isDirect, 5);
+    ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer2 = pool.getBuffer(isDirect, 15);
+
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(0);
+
+    pool.putBuffer(buffer1);
+    pool.putBuffer(buffer2);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(2);
+    ByteBuffer buffer3 = pool.getBuffer(isDirect, 12);
+    Assertions.assertThat(buffer3.capacity())
+            .describedAs("Pooled buffer should have older capacity")
+            .isEqualTo(15);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(1);
+    pool.putBuffer(buffer);
+    ByteBuffer buffer4 = pool.getBuffer(isDirect, 6);
+    Assertions.assertThat(buffer4.capacity())
+            .describedAs("Pooled buffer should have older capacity")
+            .isEqualTo(10);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(1);
+
+    pool.release();
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool post release")
+            .isEqualTo(0);
+  }
+
+  @Test
+  public void testPoolingWithDifferentInsertionTime() {
+    WeakReferencedElasticByteBufferPool pool = new 
WeakReferencedElasticByteBufferPool();
+    ByteBuffer buffer = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer2 = pool.getBuffer(isDirect, 10);
+
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(0);
+
+    pool.putBuffer(buffer1);
+    pool.putBuffer(buffer2);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(2);
+    ByteBuffer buffer3 = pool.getBuffer(isDirect, 10);
+    // As buffer1 is returned to the pool before buffer2, it should
+    // be returned when buffer of same size is asked again from
+    // the pool.
+    Assertions.assertThat(buffer3 == buffer1)

Review Comment:
   use equality test



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMoreWeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.nio.ByteBuffer;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+/**
+ * Non parameterized tests for {@code WeakReferencedElasticByteBufferPool}.
+ */
+public class TestMoreWeakReferencedElasticByteBufferPool {

Review Comment:
   1. extend HadoopTestBase with its timeout, thread name etc
   2. add a test where you ask for 0 bytes
   3. add a test where you ask for -1 bytes





Issue Time Tracking
-------------------

    Worklog Id:     (was: 768972)
    Time Spent: 1h 10m  (was: 1h)

> Implement a variant of ElasticByteBufferPool which uses weak references for 
> garbage collection.
> -----------------------------------------------------------------------------------------------
>
>                 Key: HADOOP-18105
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18105
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: common, fs
>            Reporter: Mukund Thakur
>            Assignee: Mukund Thakur
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Currently in hadoop codebase, we have two classes which implements byte 
> buffers pooling.
> One is ElasticByteBufferPool which doesn't use weak references and thus could 
> cause memory leaks in production environment. 
> Other is DirectBufferPool which uses weak references but doesn't support 
> caller's preference for either on-heap or off-heap buffers. 
>  
> The idea is to create an improved version of ElasticByteBufferPool by 
> subclassing it ( as it is marked as public and stable and used widely in hdfs 
> ) with essential functionalities required for effective buffer pooling. This 
> is important for the parent Vectored IO work.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to