smengcl commented on code in PR #6859:
URL: https://github.com/apache/ozone/pull/6859#discussion_r1676476575


##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java:
##########
@@ -67,81 +79,134 @@ public Function<ByteBuffer, ByteString> 
byteStringConversion() {
   }
 
   ChunkBuffer getCurrentBuffer() {
-    return currentBufferIndex == -1 ? null : 
bufferList.get(currentBufferIndex);
+    return doInLock(() -> currentBuffer);
   }
 
   /**
-   * If the currentBufferIndex is less than the buffer size - 1,
-   * it means, the next buffer in the list has been freed up for
-   * rewriting. Reuse the next available buffer in such cases.
-   * <p>
-   * In case, the currentBufferIndex == buffer.size and buffer size is still
-   * less than the capacity to be allocated, just allocate a buffer of size
-   * chunk size.
+   * Allocate a new {@link ChunkBuffer}, waiting for a buffer to be released 
when this pool already allocates at
+   * capacity.
    */
-  public ChunkBuffer allocateBuffer(int increment) {
-    final int nextBufferIndex = currentBufferIndex + 1;
-
-    Preconditions.assertTrue(nextBufferIndex < capacity, () ->
-        "next index: " + nextBufferIndex + " >= capacity: " + capacity);
-
-    currentBufferIndex = nextBufferIndex;
-
-    if (currentBufferIndex < bufferList.size()) {
-      return getBuffer(currentBufferIndex);
-    } else {
-      final ChunkBuffer newBuffer = ChunkBuffer.allocate(bufferSize, 
increment);
-      bufferList.add(newBuffer);
-      return newBuffer;
+  public ChunkBuffer allocateBuffer(int increment) throws InterruptedException 
{
+    lock.lockInterruptibly();
+    try {
+      Preconditions.assertTrue(allocated.size() + released.size() <= capacity, 
() ->
+          "Total created buffer must not exceed capacity.");
+
+      while (allocated.size() == capacity) {
+        LOG.debug("Allocation needs to wait the pool is at capacity (allocated 
= capacity = {}).", capacity);
+        notFull.await();
+      }
+      // Get a buffer to allocate, preferably from the released ones.
+      final ChunkBuffer buffer = released.isEmpty() ?
+          ChunkBuffer.allocate(bufferSize, increment) : released.removeFirst();
+      allocated.add(buffer);
+      currentBuffer = buffer;
+
+      LOG.debug("Allocated new buffer {}, number of used buffers {}, capacity 
{}.",
+          buffer, allocated.size(), capacity);
+      return buffer;
+    } finally {
+      lock.unlock();
     }
   }
 
-  void releaseBuffer(ChunkBuffer chunkBuffer) {
-    Preconditions.assertTrue(!bufferList.isEmpty(), "empty buffer list");
-    Preconditions.assertSame(bufferList.get(0), chunkBuffer,
-        "only the first buffer can be released");
-    Preconditions.assertTrue(currentBufferIndex >= 0,
-        () -> "current buffer: " + currentBufferIndex);
+  void releaseBuffer(ChunkBuffer buffer) {
+    LOG.debug("Releasing buffer {}", buffer);
+    lock.lock();
+    try {
+      Preconditions.assertTrue(removeByIdentity(allocated, buffer), "Releasing 
unknown buffer");
+      buffer.clear();
+      released.add(buffer);
+      if (buffer == currentBuffer) {
+        currentBuffer = null;
+      }
+      notFull.signal();

Review Comment:
   ~~wait it looks like there could be at most one thread await()ing because of 
the `lock`~~ `notFull.await()` will release the `lock`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org
For additional commands, e-mail: issues-h...@ozone.apache.org

Reply via email to