This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6850923cc5 GH-34230: [Java] Call allocation listener on 
BaseAllocator#wrapForeignAllocation (#34231)
6850923cc5 is described below

commit 6850923cc56c57dac28c85088d9c49789f9ecfdc
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Feb 21 21:04:37 2023 +0800

    GH-34230: [Java] Call allocation listener on 
BaseAllocator#wrapForeignAllocation (#34231)
    
    #34230
    * Closes: #34230
    
    Lead-authored-by: Hongze Zhang <[email protected]>
    Co-authored-by: Hongze Zhang <[email protected]>
    Signed-off-by: David Li <[email protected]>
---
 .../org/apache/arrow/memory/BaseAllocator.java     |  39 +++++++
 .../org/apache/arrow/memory/BufferAllocator.java   |  17 +---
 .../arrow/memory/CountingAllocationListener.java   | 113 +++++++++++++++++++++
 .../org/apache/arrow/memory/TestBaseAllocator.java |  91 +----------------
 .../apache/arrow/memory/TestForeignAllocation.java |  77 ++++++++++++++
 5 files changed, 233 insertions(+), 104 deletions(-)

diff --git 
a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java
 
b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java
index 8d21cef7aa..36bc3433aa 100644
--- 
a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java
+++ 
b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java
@@ -233,6 +233,45 @@ abstract class BaseAllocator extends Accountant implements 
BufferAllocator {
     listener.onChildRemoved(this, childAllocator);
   }
 
+  @Override
+  public ArrowBuf wrapForeignAllocation(ForeignAllocation allocation) {
+    assertOpen();
+    final long size = allocation.getSize();
+    listener.onPreAllocation(size);
+    AllocationOutcome outcome = this.allocateBytes(size);
+    if (!outcome.isOk()) {
+      if (listener.onFailedAllocation(size, outcome)) {
+        // Second try, in case the listener can do something about it
+        outcome = this.allocateBytes(size);
+      }
+      if (!outcome.isOk()) {
+        throw new OutOfMemoryException(createErrorMsg(this, size,
+            size), outcome.getDetails());
+      }
+    }
+    try {
+      final AllocationManager manager = new ForeignAllocationManager(this, 
allocation);
+      final BufferLedger ledger = manager.associate(this);
+      final ArrowBuf buf =
+          new ArrowBuf(ledger, /*bufferManager=*/null, size, 
allocation.memoryAddress());
+      buf.writerIndex(size);
+      listener.onAllocation(size);
+      return buf;
+    } catch (Throwable t) {
+      try {
+        releaseBytes(size);
+      } catch (Throwable e) {
+        t.addSuppressed(e);
+      }
+      try {
+        allocation.release0();
+      } catch (Throwable e) {
+        t.addSuppressed(e);
+      }
+      throw t;
+    }
+  }
+
   @Override
   public ArrowBuf buffer(final long initialRequestSize) {
     assertOpen();
diff --git 
a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java
 
b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java
index bb3816d9c4..90a4ef26fb 100644
--- 
a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java
+++ 
b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java
@@ -249,21 +249,6 @@ public interface BufferAllocator extends AutoCloseable {
    * @param allocation The underlying allocation.
    */
   default ArrowBuf wrapForeignAllocation(ForeignAllocation allocation) {
-    try {
-      forceAllocate(allocation.getSize());
-      final AllocationManager manager = new ForeignAllocationManager(this, 
allocation);
-      final BufferLedger ledger = manager.associate(this);
-      final ArrowBuf buf =
-          new ArrowBuf(ledger, /*bufferManager=*/null, allocation.getSize(), 
allocation.memoryAddress());
-      buf.writerIndex(allocation.getSize());
-      return buf;
-    } catch (Throwable t) {
-      try {
-        allocation.release0();
-      } catch (Throwable e) {
-        t.addSuppressed(e);
-      }
-      throw t;
-    }
+    throw new UnsupportedOperationException();
   }
 }
diff --git 
a/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/CountingAllocationListener.java
 
b/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/CountingAllocationListener.java
new file mode 100644
index 0000000000..78c78c8ad8
--- /dev/null
+++ 
b/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/CountingAllocationListener.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+/**
+ * Counting allocation listener.
+ * It counts the number of times it has been invoked, and how much memory 
allocation it has seen
+ * When set to 'expand on fail', it attempts to expand the associated 
allocator's limit.
+ */
+final class CountingAllocationListener implements AllocationListener {
+  private int numPreCalls;
+  private int numCalls;
+  private int numReleaseCalls;
+  private int numChildren;
+  private long totalMem;
+  private long currentMem;
+  private boolean expandOnFail;
+  BufferAllocator expandAlloc;
+  long expandLimit;
+
+  CountingAllocationListener() {
+    this.numCalls = 0;
+    this.numChildren = 0;
+    this.totalMem = 0;
+    this.currentMem = 0;
+    this.expandOnFail = false;
+    this.expandAlloc = null;
+    this.expandLimit = 0;
+  }
+
+  @Override
+  public void onPreAllocation(long size) {
+    numPreCalls++;
+  }
+
+  @Override
+  public void onAllocation(long size) {
+    numCalls++;
+    totalMem += size;
+    currentMem += size;
+  }
+
+  @Override
+  public boolean onFailedAllocation(long size, AllocationOutcome outcome) {
+    if (expandOnFail) {
+      expandAlloc.setLimit(expandLimit);
+      return true;
+    }
+    return false;
+  }
+
+
+  @Override
+  public void onRelease(long size) {
+    numReleaseCalls++;
+    currentMem -= size;
+  }
+
+  @Override
+  public void onChildAdded(BufferAllocator parentAllocator, BufferAllocator 
childAllocator) {
+    ++numChildren;
+  }
+
+  @Override
+  public void onChildRemoved(BufferAllocator parentAllocator, BufferAllocator 
childAllocator) {
+    --numChildren;
+  }
+
+  void setExpandOnFail(BufferAllocator expandAlloc, long expandLimit) {
+    this.expandOnFail = true;
+    this.expandAlloc = expandAlloc;
+    this.expandLimit = expandLimit;
+  }
+
+  int getNumPreCalls() {
+    return numPreCalls;
+  }
+
+  int getNumReleaseCalls() {
+    return numReleaseCalls;
+  }
+
+  int getNumCalls() {
+    return numCalls;
+  }
+
+  int getNumChildren() {
+    return numChildren;
+  }
+
+  long getTotalMem() {
+    return totalMem;
+  }
+
+  long getCurrentMem() {
+    return currentMem;
+  }
+}
diff --git 
a/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
 
b/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
index ef49e41785..04c33e5d24 100644
--- 
a/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
+++ 
b/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
@@ -438,100 +438,15 @@ public class TestBaseAllocator {
         }).build());
   }
 
-  // Allocation listener
-  // It counts the number of times it has been invoked, and how much memory 
allocation it has seen
-  // When set to 'expand on fail', it attempts to expand the associated 
allocator's limit
-  private static final class TestAllocationListener implements 
AllocationListener {
-    private int numPreCalls;
-    private int numCalls;
-    private int numReleaseCalls;
-    private int numChildren;
-    private long totalMem;
-    private boolean expandOnFail;
-    BufferAllocator expandAlloc;
-    long expandLimit;
-
-    TestAllocationListener() {
-      this.numCalls = 0;
-      this.numChildren = 0;
-      this.totalMem = 0;
-      this.expandOnFail = false;
-      this.expandAlloc = null;
-      this.expandLimit = 0;
-    }
-
-    @Override
-    public void onPreAllocation(long size) {
-      numPreCalls++;
-    }
-
-    @Override
-    public void onAllocation(long size) {
-      numCalls++;
-      totalMem += size;
-    }
-
-    @Override
-    public boolean onFailedAllocation(long size, AllocationOutcome outcome) {
-      if (expandOnFail) {
-        expandAlloc.setLimit(expandLimit);
-        return true;
-      }
-      return false;
-    }
-
-
-    @Override
-    public void onRelease(long size) {
-      numReleaseCalls++;
-    }
-
-    @Override
-    public void onChildAdded(BufferAllocator parentAllocator, BufferAllocator 
childAllocator) {
-      ++numChildren;
-    }
-
-    @Override
-    public void onChildRemoved(BufferAllocator parentAllocator, 
BufferAllocator childAllocator) {
-      --numChildren;
-    }
-
-    void setExpandOnFail(BufferAllocator expandAlloc, long expandLimit) {
-      this.expandOnFail = true;
-      this.expandAlloc = expandAlloc;
-      this.expandLimit = expandLimit;
-    }
-
-    int getNumPreCalls() {
-      return numPreCalls;
-    }
-
-    int getNumReleaseCalls() {
-      return numReleaseCalls;
-    }
-
-    int getNumCalls() {
-      return numCalls;
-    }
-
-    int getNumChildren() {
-      return numChildren;
-    }
-
-    long getTotalMem() {
-      return totalMem;
-    }
-  }
-
   @Test
   public void testRootAllocator_listeners() throws Exception {
-    TestAllocationListener l1 = new TestAllocationListener();
+    CountingAllocationListener l1 = new CountingAllocationListener();
     assertEquals(0, l1.getNumPreCalls());
     assertEquals(0, l1.getNumCalls());
     assertEquals(0, l1.getNumReleaseCalls());
     assertEquals(0, l1.getNumChildren());
     assertEquals(0, l1.getTotalMem());
-    TestAllocationListener l2 = new TestAllocationListener();
+    CountingAllocationListener l2 = new CountingAllocationListener();
     assertEquals(0, l2.getNumPreCalls());
     assertEquals(0, l2.getNumCalls());
     assertEquals(0, l2.getNumReleaseCalls());
@@ -590,7 +505,7 @@ public class TestBaseAllocator {
 
   @Test
   public void testRootAllocator_listenerAllocationFail() throws Exception {
-    TestAllocationListener l1 = new TestAllocationListener();
+    CountingAllocationListener l1 = new CountingAllocationListener();
     assertEquals(0, l1.getNumCalls());
     assertEquals(0, l1.getTotalMem());
     // Test attempts to allocate too much from a child whose limit is set to 
half of the max
diff --git 
a/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestForeignAllocation.java
 
b/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestForeignAllocation.java
index 5e40645e06..ec049ca692 100644
--- 
a/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestForeignAllocation.java
+++ 
b/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestForeignAllocation.java
@@ -20,6 +20,9 @@ package org.apache.arrow.memory;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.arrow.memory.util.MemoryUtil;
 import org.junit.After;
 import org.junit.Before;
@@ -54,6 +57,80 @@ public class TestForeignAllocation {
     assertEquals(0, allocator.getAllocatedMemory());
   }
 
+  @Test
+  public void wrapForeignAllocationWithAllocationListener() {
+    final long bufferSize = 16;
+
+    final CountingAllocationListener listener = new 
CountingAllocationListener();
+    try (BufferAllocator listenedAllocator =
+        allocator.newChildAllocator("child", listener, 0L, 
allocator.getLimit())) {
+      UnsafeForeignAllocation allocation = new 
UnsafeForeignAllocation(bufferSize);
+      try {
+        assertEquals(0, listenedAllocator.getAllocatedMemory());
+        ArrowBuf buf = listenedAllocator.wrapForeignAllocation(allocation);
+        assertEquals(bufferSize, buf.capacity());
+        assertEquals(16, listener.getCurrentMem());
+        buf.close();
+        assertEquals(0, listener.getCurrentMem());
+        assertTrue(allocation.released);
+      } finally {
+        allocation.release0();
+      }
+      assertEquals(0, listenedAllocator.getAllocatedMemory());
+    }
+    assertEquals(1, listener.getNumPreCalls());
+    assertEquals(1, listener.getNumCalls());
+    assertEquals(1, listener.getNumReleaseCalls());
+    assertEquals(16, listener.getTotalMem());
+  }
+
+  @Test(expected = OutOfMemoryException.class)
+  public void wrapForeignAllocationFailedWithAllocationListener() {
+    final long bufferSize = 16;
+    final long limit = bufferSize - 1;
+
+    final CountingAllocationListener listener = new 
CountingAllocationListener();
+    try (BufferAllocator listenedAllocator =
+        allocator.newChildAllocator("child", listener, 0L, limit)) {
+      UnsafeForeignAllocation allocation = new 
UnsafeForeignAllocation(bufferSize);
+      try {
+        assertEquals(0, listenedAllocator.getAllocatedMemory());
+        ArrowBuf buf = listenedAllocator.wrapForeignAllocation(allocation);
+        assertEquals(bufferSize, buf.capacity());
+        buf.close();
+        assertTrue(allocation.released);
+      } finally {
+        allocation.release0();
+      }
+    }
+  }
+
+  @Test
+  public void wrapForeignAllocationWithAllocationListenerReclaimingSpace() {
+    final long bufferSize = 16;
+    final long limit = 2 * bufferSize - 1;
+
+    final List<ArrowBuf> buffersToBeFreed = new ArrayList<>();
+    final AllocationListener listener = new AllocationListener() {
+      @Override
+      public boolean onFailedAllocation(long size, AllocationOutcome outcome) {
+        buffersToBeFreed.forEach(ArrowBuf::close);
+        return true;
+      }
+    };
+
+    try (BufferAllocator listenedAllocator =
+        allocator.newChildAllocator("child", listener, 0L, limit)) {
+      final ArrowBuf buffer1 = listenedAllocator.buffer(bufferSize);
+      buffersToBeFreed.add(buffer1);
+      UnsafeForeignAllocation allocation = new 
UnsafeForeignAllocation(bufferSize);
+      try (final ArrowBuf buffer2 = 
listenedAllocator.wrapForeignAllocation(allocation)) {
+        assertEquals(bufferSize, buffer2.capacity());
+        assertEquals(0, buffer1.getReferenceManager().getRefCount()); // 
buffer1 was closed by listener
+      }
+    }
+  }
+
   private static class UnsafeForeignAllocation extends ForeignAllocation {
     boolean released = false;
 

Reply via email to