This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 6850923cc5 GH-34230: [Java] Call allocation listener on
BaseAllocator#wrapForeignAllocation (#34231)
6850923cc5 is described below
commit 6850923cc56c57dac28c85088d9c49789f9ecfdc
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Feb 21 21:04:37 2023 +0800
GH-34230: [Java] Call allocation listener on
BaseAllocator#wrapForeignAllocation (#34231)
#34230
* Closes: #34230
Lead-authored-by: Hongze Zhang <[email protected]>
Co-authored-by: Hongze Zhang <[email protected]>
Signed-off-by: David Li <[email protected]>
---
.../org/apache/arrow/memory/BaseAllocator.java | 39 +++++++
.../org/apache/arrow/memory/BufferAllocator.java | 17 +---
.../arrow/memory/CountingAllocationListener.java | 113 +++++++++++++++++++++
.../org/apache/arrow/memory/TestBaseAllocator.java | 91 +----------------
.../apache/arrow/memory/TestForeignAllocation.java | 77 ++++++++++++++
5 files changed, 233 insertions(+), 104 deletions(-)
diff --git
a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java
b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java
index 8d21cef7aa..36bc3433aa 100644
---
a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java
+++
b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java
@@ -233,6 +233,45 @@ abstract class BaseAllocator extends Accountant implements
BufferAllocator {
listener.onChildRemoved(this, childAllocator);
}
+ @Override
+ public ArrowBuf wrapForeignAllocation(ForeignAllocation allocation) {
+ assertOpen();
+ final long size = allocation.getSize();
+ listener.onPreAllocation(size);
+ AllocationOutcome outcome = this.allocateBytes(size);
+ if (!outcome.isOk()) {
+ if (listener.onFailedAllocation(size, outcome)) {
+ // Second try, in case the listener can do something about it
+ outcome = this.allocateBytes(size);
+ }
+ if (!outcome.isOk()) {
+ throw new OutOfMemoryException(createErrorMsg(this, size,
+ size), outcome.getDetails());
+ }
+ }
+ try {
+ final AllocationManager manager = new ForeignAllocationManager(this,
allocation);
+ final BufferLedger ledger = manager.associate(this);
+ final ArrowBuf buf =
+ new ArrowBuf(ledger, /*bufferManager=*/null, size,
allocation.memoryAddress());
+ buf.writerIndex(size);
+ listener.onAllocation(size);
+ return buf;
+ } catch (Throwable t) {
+ try {
+ releaseBytes(size);
+ } catch (Throwable e) {
+ t.addSuppressed(e);
+ }
+ try {
+ allocation.release0();
+ } catch (Throwable e) {
+ t.addSuppressed(e);
+ }
+ throw t;
+ }
+ }
+
@Override
public ArrowBuf buffer(final long initialRequestSize) {
assertOpen();
diff --git
a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java
b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java
index bb3816d9c4..90a4ef26fb 100644
---
a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java
+++
b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java
@@ -249,21 +249,6 @@ public interface BufferAllocator extends AutoCloseable {
* @param allocation The underlying allocation.
*/
default ArrowBuf wrapForeignAllocation(ForeignAllocation allocation) {
- try {
- forceAllocate(allocation.getSize());
- final AllocationManager manager = new ForeignAllocationManager(this,
allocation);
- final BufferLedger ledger = manager.associate(this);
- final ArrowBuf buf =
- new ArrowBuf(ledger, /*bufferManager=*/null, allocation.getSize(),
allocation.memoryAddress());
- buf.writerIndex(allocation.getSize());
- return buf;
- } catch (Throwable t) {
- try {
- allocation.release0();
- } catch (Throwable e) {
- t.addSuppressed(e);
- }
- throw t;
- }
+ throw new UnsupportedOperationException();
}
}
diff --git
a/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/CountingAllocationListener.java
b/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/CountingAllocationListener.java
new file mode 100644
index 0000000000..78c78c8ad8
--- /dev/null
+++
b/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/CountingAllocationListener.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+/**
+ * Counting allocation listener.
+ * It counts the number of times it has been invoked, and how much memory
allocation it has seen
+ * When set to 'expand on fail', it attempts to expand the associated
allocator's limit.
+ */
+final class CountingAllocationListener implements AllocationListener {
+ private int numPreCalls;
+ private int numCalls;
+ private int numReleaseCalls;
+ private int numChildren;
+ private long totalMem;
+ private long currentMem;
+ private boolean expandOnFail;
+ BufferAllocator expandAlloc;
+ long expandLimit;
+
+ CountingAllocationListener() {
+ this.numCalls = 0;
+ this.numChildren = 0;
+ this.totalMem = 0;
+ this.currentMem = 0;
+ this.expandOnFail = false;
+ this.expandAlloc = null;
+ this.expandLimit = 0;
+ }
+
+ @Override
+ public void onPreAllocation(long size) {
+ numPreCalls++;
+ }
+
+ @Override
+ public void onAllocation(long size) {
+ numCalls++;
+ totalMem += size;
+ currentMem += size;
+ }
+
+ @Override
+ public boolean onFailedAllocation(long size, AllocationOutcome outcome) {
+ if (expandOnFail) {
+ expandAlloc.setLimit(expandLimit);
+ return true;
+ }
+ return false;
+ }
+
+
+ @Override
+ public void onRelease(long size) {
+ numReleaseCalls++;
+ currentMem -= size;
+ }
+
+ @Override
+ public void onChildAdded(BufferAllocator parentAllocator, BufferAllocator
childAllocator) {
+ ++numChildren;
+ }
+
+ @Override
+ public void onChildRemoved(BufferAllocator parentAllocator, BufferAllocator
childAllocator) {
+ --numChildren;
+ }
+
+ void setExpandOnFail(BufferAllocator expandAlloc, long expandLimit) {
+ this.expandOnFail = true;
+ this.expandAlloc = expandAlloc;
+ this.expandLimit = expandLimit;
+ }
+
+ int getNumPreCalls() {
+ return numPreCalls;
+ }
+
+ int getNumReleaseCalls() {
+ return numReleaseCalls;
+ }
+
+ int getNumCalls() {
+ return numCalls;
+ }
+
+ int getNumChildren() {
+ return numChildren;
+ }
+
+ long getTotalMem() {
+ return totalMem;
+ }
+
+ long getCurrentMem() {
+ return currentMem;
+ }
+}
diff --git
a/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
b/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
index ef49e41785..04c33e5d24 100644
---
a/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
+++
b/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
@@ -438,100 +438,15 @@ public class TestBaseAllocator {
}).build());
}
- // Allocation listener
- // It counts the number of times it has been invoked, and how much memory
allocation it has seen
- // When set to 'expand on fail', it attempts to expand the associated
allocator's limit
- private static final class TestAllocationListener implements
AllocationListener {
- private int numPreCalls;
- private int numCalls;
- private int numReleaseCalls;
- private int numChildren;
- private long totalMem;
- private boolean expandOnFail;
- BufferAllocator expandAlloc;
- long expandLimit;
-
- TestAllocationListener() {
- this.numCalls = 0;
- this.numChildren = 0;
- this.totalMem = 0;
- this.expandOnFail = false;
- this.expandAlloc = null;
- this.expandLimit = 0;
- }
-
- @Override
- public void onPreAllocation(long size) {
- numPreCalls++;
- }
-
- @Override
- public void onAllocation(long size) {
- numCalls++;
- totalMem += size;
- }
-
- @Override
- public boolean onFailedAllocation(long size, AllocationOutcome outcome) {
- if (expandOnFail) {
- expandAlloc.setLimit(expandLimit);
- return true;
- }
- return false;
- }
-
-
- @Override
- public void onRelease(long size) {
- numReleaseCalls++;
- }
-
- @Override
- public void onChildAdded(BufferAllocator parentAllocator, BufferAllocator
childAllocator) {
- ++numChildren;
- }
-
- @Override
- public void onChildRemoved(BufferAllocator parentAllocator,
BufferAllocator childAllocator) {
- --numChildren;
- }
-
- void setExpandOnFail(BufferAllocator expandAlloc, long expandLimit) {
- this.expandOnFail = true;
- this.expandAlloc = expandAlloc;
- this.expandLimit = expandLimit;
- }
-
- int getNumPreCalls() {
- return numPreCalls;
- }
-
- int getNumReleaseCalls() {
- return numReleaseCalls;
- }
-
- int getNumCalls() {
- return numCalls;
- }
-
- int getNumChildren() {
- return numChildren;
- }
-
- long getTotalMem() {
- return totalMem;
- }
- }
-
@Test
public void testRootAllocator_listeners() throws Exception {
- TestAllocationListener l1 = new TestAllocationListener();
+ CountingAllocationListener l1 = new CountingAllocationListener();
assertEquals(0, l1.getNumPreCalls());
assertEquals(0, l1.getNumCalls());
assertEquals(0, l1.getNumReleaseCalls());
assertEquals(0, l1.getNumChildren());
assertEquals(0, l1.getTotalMem());
- TestAllocationListener l2 = new TestAllocationListener();
+ CountingAllocationListener l2 = new CountingAllocationListener();
assertEquals(0, l2.getNumPreCalls());
assertEquals(0, l2.getNumCalls());
assertEquals(0, l2.getNumReleaseCalls());
@@ -590,7 +505,7 @@ public class TestBaseAllocator {
@Test
public void testRootAllocator_listenerAllocationFail() throws Exception {
- TestAllocationListener l1 = new TestAllocationListener();
+ CountingAllocationListener l1 = new CountingAllocationListener();
assertEquals(0, l1.getNumCalls());
assertEquals(0, l1.getTotalMem());
// Test attempts to allocate too much from a child whose limit is set to
half of the max
diff --git
a/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestForeignAllocation.java
b/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestForeignAllocation.java
index 5e40645e06..ec049ca692 100644
---
a/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestForeignAllocation.java
+++
b/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestForeignAllocation.java
@@ -20,6 +20,9 @@ package org.apache.arrow.memory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.arrow.memory.util.MemoryUtil;
import org.junit.After;
import org.junit.Before;
@@ -54,6 +57,80 @@ public class TestForeignAllocation {
assertEquals(0, allocator.getAllocatedMemory());
}
+ @Test
+ public void wrapForeignAllocationWithAllocationListener() {
+ final long bufferSize = 16;
+
+ final CountingAllocationListener listener = new
CountingAllocationListener();
+ try (BufferAllocator listenedAllocator =
+ allocator.newChildAllocator("child", listener, 0L,
allocator.getLimit())) {
+ UnsafeForeignAllocation allocation = new
UnsafeForeignAllocation(bufferSize);
+ try {
+ assertEquals(0, listenedAllocator.getAllocatedMemory());
+ ArrowBuf buf = listenedAllocator.wrapForeignAllocation(allocation);
+ assertEquals(bufferSize, buf.capacity());
+ assertEquals(16, listener.getCurrentMem());
+ buf.close();
+ assertEquals(0, listener.getCurrentMem());
+ assertTrue(allocation.released);
+ } finally {
+ allocation.release0();
+ }
+ assertEquals(0, listenedAllocator.getAllocatedMemory());
+ }
+ assertEquals(1, listener.getNumPreCalls());
+ assertEquals(1, listener.getNumCalls());
+ assertEquals(1, listener.getNumReleaseCalls());
+ assertEquals(16, listener.getTotalMem());
+ }
+
+ @Test(expected = OutOfMemoryException.class)
+ public void wrapForeignAllocationFailedWithAllocationListener() {
+ final long bufferSize = 16;
+ final long limit = bufferSize - 1;
+
+ final CountingAllocationListener listener = new
CountingAllocationListener();
+ try (BufferAllocator listenedAllocator =
+ allocator.newChildAllocator("child", listener, 0L, limit)) {
+ UnsafeForeignAllocation allocation = new
UnsafeForeignAllocation(bufferSize);
+ try {
+ assertEquals(0, listenedAllocator.getAllocatedMemory());
+ ArrowBuf buf = listenedAllocator.wrapForeignAllocation(allocation);
+ assertEquals(bufferSize, buf.capacity());
+ buf.close();
+ assertTrue(allocation.released);
+ } finally {
+ allocation.release0();
+ }
+ }
+ }
+
+ @Test
+ public void wrapForeignAllocationWithAllocationListenerReclaimingSpace() {
+ final long bufferSize = 16;
+ final long limit = 2 * bufferSize - 1;
+
+ final List<ArrowBuf> buffersToBeFreed = new ArrayList<>();
+ final AllocationListener listener = new AllocationListener() {
+ @Override
+ public boolean onFailedAllocation(long size, AllocationOutcome outcome) {
+ buffersToBeFreed.forEach(ArrowBuf::close);
+ return true;
+ }
+ };
+
+ try (BufferAllocator listenedAllocator =
+ allocator.newChildAllocator("child", listener, 0L, limit)) {
+ final ArrowBuf buffer1 = listenedAllocator.buffer(bufferSize);
+ buffersToBeFreed.add(buffer1);
+ UnsafeForeignAllocation allocation = new
UnsafeForeignAllocation(bufferSize);
+ try (final ArrowBuf buffer2 =
listenedAllocator.wrapForeignAllocation(allocation)) {
+ assertEquals(bufferSize, buffer2.capacity());
+ assertEquals(0, buffer1.getReferenceManager().getRefCount()); //
buffer1 was closed by listener
+ }
+ }
+ }
+
private static class UnsafeForeignAllocation extends ForeignAllocation {
boolean released = false;