[
https://issues.apache.org/jira/browse/HADOOP-18105?focusedWorklogId=771645&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-771645
]
ASF GitHub Bot logged work on HADOOP-18105:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 18/May/22 00:00
Start Date: 18/May/22 00:00
Worklog Time Spent: 10m
Work Description: mukund-thakur commented on code in PR #4263:
URL: https://github.com/apache/hadoop/pull/4263#discussion_r875361776
##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestWeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Unit tests for {@code WeakReferencedElasticByteBufferPool}.
+ */
+@RunWith(Parameterized.class)
+public class TestWeakReferencedElasticByteBufferPool {
+
+ private final boolean isDirect;
+
+ private final String type;
+
+ @Parameterized.Parameters(name = "Buffer type : {0}")
+ public static List<String> params() {
+ return Arrays.asList("direct", "array");
+ }
+
+ public TestWeakReferencedElasticByteBufferPool(String type) {
+ this.type = type;
+ this.isDirect = !"array".equals(type);
+ }
+
+ // Add more tests for different time and same size buffers in the pool.
+ @Test
+ public void testGetAndPutBasic() {
+ WeakReferencedElasticByteBufferPool pool = new
WeakReferencedElasticByteBufferPool();
+ int bufferSize = 5;
+ ByteBuffer buffer = pool.getBuffer(isDirect, bufferSize);
+ Assertions.assertThat(buffer.isDirect())
+ .describedAs("Buffered returned should be of correct type {}",
type)
+ .isEqualTo(isDirect);
+ Assertions.assertThat(buffer.capacity())
+ .describedAs("Initial capacity of returned buffer from pool")
+ .isEqualTo(bufferSize);
+ Assertions.assertThat(buffer.position())
+ .describedAs("Initial position of returned buffer from pool")
+ .isEqualTo(0);
+
+ byte[] arr = createByteArray(bufferSize);
+ buffer.put(arr, 0, arr.length);
+ buffer.flip();
+ validateBufferContent(buffer, arr);
+ Assertions.assertThat(buffer.position())
+ .describedAs("Buffer's position after filling bytes in it")
+ .isEqualTo(bufferSize);
+ // releasing buffer to the pool.
+ pool.putBuffer(buffer);
+ Assertions.assertThat(buffer.position())
+ .describedAs("Position should be reset to 0 after returning buffer
to the pool")
+ .isEqualTo(0);
+
+ }
+
+ @Test
+ public void testPoolingWithDifferentSizes() {
+ WeakReferencedElasticByteBufferPool pool = new
WeakReferencedElasticByteBufferPool();
+ ByteBuffer buffer = pool.getBuffer(isDirect, 5);
+ ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
+ ByteBuffer buffer2 = pool.getBuffer(isDirect, 15);
+
+ Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+ .describedAs("Number of buffers in the pool")
+ .isEqualTo(0);
+
+ pool.putBuffer(buffer1);
+ pool.putBuffer(buffer2);
+ Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+ .describedAs("Number of buffers in the pool")
+ .isEqualTo(2);
+ ByteBuffer buffer3 = pool.getBuffer(isDirect, 12);
+ Assertions.assertThat(buffer3.capacity())
+ .describedAs("Pooled buffer should have older capacity")
+ .isEqualTo(15);
+ Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+ .describedAs("Number of buffers in the pool")
+ .isEqualTo(1);
+ pool.putBuffer(buffer);
+ ByteBuffer buffer4 = pool.getBuffer(isDirect, 6);
+ Assertions.assertThat(buffer4.capacity())
+ .describedAs("Pooled buffer should have older capacity")
+ .isEqualTo(10);
+ Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+ .describedAs("Number of buffers in the pool")
+ .isEqualTo(1);
+
+ pool.release();
+ Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+ .describedAs("Number of buffers in the pool post release")
+ .isEqualTo(0);
+ }
+
+ @Test
+ public void testPoolingWithDifferentInsertionTime() {
+ WeakReferencedElasticByteBufferPool pool = new
WeakReferencedElasticByteBufferPool();
+ ByteBuffer buffer = pool.getBuffer(isDirect, 10);
+ ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
+ ByteBuffer buffer2 = pool.getBuffer(isDirect, 10);
+
+ Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+ .describedAs("Number of buffers in the pool")
+ .isEqualTo(0);
+
+ pool.putBuffer(buffer1);
+ pool.putBuffer(buffer2);
+ Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+ .describedAs("Number of buffers in the pool")
+ .isEqualTo(2);
+ ByteBuffer buffer3 = pool.getBuffer(isDirect, 10);
+ // As buffer1 is returned to the pool before buffer2, it should
+ // be returned when buffer of same size is asked again from
+ // the pool.
+ Assertions.assertThat(buffer3 == buffer1)
+ .describedAs("Buffers should be returned in order of their " +
+ "insertion time")
+ .isTrue();
+ pool.putBuffer(buffer);
+ ByteBuffer buffer4 = pool.getBuffer(isDirect, 10);
+ Assertions.assertThat(buffer4 == buffer2)
+ .describedAs("Buffers should be returned in order of their " +
+ "insertion time")
+ .isTrue();
+ }
+
+ @Test
+ public void testGarbageCollection() {
+ WeakReferencedElasticByteBufferPool pool = new
WeakReferencedElasticByteBufferPool();
+ ByteBuffer buffer = pool.getBuffer(isDirect, 5);
+ ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
+ ByteBuffer buffer2 = pool.getBuffer(isDirect, 15);
+ Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+ .describedAs("Number of buffers in the pool")
+ .isEqualTo(0);
+ pool.putBuffer(buffer1);
+ pool.putBuffer(buffer2);
+ Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+ .describedAs("Number of buffers in the pool")
+ .isEqualTo(2);
+ // Before GC.
+ ByteBuffer buffer4 = pool.getBuffer(isDirect, 12);
+ Assertions.assertThat(buffer4.capacity())
+ .describedAs("Pooled buffer should have older capacity")
+ .isEqualTo(15);
+
+ // Removing the references
+ buffer1 = null;
+ buffer2 = null;
+ System.gc();
Review Comment:
Oh I see. and just revisited your code and see many managers being created
in the test.
For me it is working fine though. Every-time giving consistent results.
Issue Time Tracking
-------------------
Worklog Id: (was: 771645)
Time Spent: 1.5h (was: 1h 20m)
> Implement a variant of ElasticByteBufferPool which uses weak references for
> garbage collection.
> -----------------------------------------------------------------------------------------------
>
> Key: HADOOP-18105
> URL: https://issues.apache.org/jira/browse/HADOOP-18105
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: common, fs
> Reporter: Mukund Thakur
> Assignee: Mukund Thakur
> Priority: Major
> Labels: pull-request-available
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> Currently in hadoop codebase, we have two classes which implements byte
> buffers pooling.
> One is ElasticByteBufferPool which doesn't use weak references and thus could
> cause memory leaks in production environment.
> Other is DirectBufferPool which uses weak references but doesn't support
> caller's preference for either on-heap or off-heap buffers.
>
> The idea is to create an improved version of ElasticByteBufferPool by
> subclassing it ( as it is marked as public and stable and used widely in hdfs
> ) with essential functionalities required for effective buffer pooling. This
> is important for the parent Vectored IO work.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]