This is an automated email from the ASF dual-hosted git repository. leerho pushed a commit to branch update_memReqSvr in repository https://gitbox.apache.org/repos/asf/datasketches-memory.git
commit 9d29ac90092a9db048773045f8059523a50efc0d Author: Lee Rhodes <[email protected]> AuthorDate: Sat Feb 15 17:52:13 2025 -0800 Update how Memory Request Server works. --- .../memory/DefaultMemoryRequestServer.java | 110 ++++++++++++++++++--- .../org/apache/datasketches/memory/Memory.java | 14 +-- .../datasketches/memory/MemoryRequestServer.java | 28 ++---- .../org/apache/datasketches/memory/Resource.java | 9 +- .../apache/datasketches/memory/WritableMemory.java | 40 +++++--- .../datasketches/memory/internal/ResourceImpl.java | 9 +- .../memory/internal/WritableMemoryImpl.java | 25 ++++- .../memory/internal/AllocateDirectMemoryTest.java | 65 ++++++++++-- .../memory/internal/DruidIssue11544Test.java | 12 +-- .../internal/ExampleMemoryRequestServerTest.java | 16 ++- 10 files changed, 246 insertions(+), 82 deletions(-) diff --git a/src/main/java/org/apache/datasketches/memory/DefaultMemoryRequestServer.java b/src/main/java/org/apache/datasketches/memory/DefaultMemoryRequestServer.java index d288eee5..1077cfe4 100644 --- a/src/main/java/org/apache/datasketches/memory/DefaultMemoryRequestServer.java +++ b/src/main/java/org/apache/datasketches/memory/DefaultMemoryRequestServer.java @@ -27,39 +27,121 @@ import java.nio.ByteOrder; * manage continuous requests for larger or smaller memory. * This capability is only available for writable, non-file-memory-mapping resources. * + * <p>The operation of this implementation is controlled by three conditions:</p> + * <ul> + * <li><b><i>origOffHeap:</i></b> If <i>true</i>, the originally allocated WritableMemory is off-heap.</li> + * + * <li><b><i>oneArena:</i></b> If <i>true</i>, all subsequent off-heap allocations will use the same Arena + * obtained from the original off-heap WritableMemory. Otherwise, subsequent off-heap allocations will + * use a new confined Arena created by this implementation.</li> + * + * <li><b><i>offHeap:</i></b> If <i>true</i>, all subsequent allocations will be off-heap. + * If the originally allocated WritableMemory is on-heap, this variable is ignored.</li> + * </ul> + * + * <p>These three variables work together as follows:</p> + * + * <ul> + * <li>If the original WritableMemory is on-heap, all subsequent allocations will also be on-heap.</li> + * + * <li>If <i>origOffHeap</i> = <i>true</i>, <i>oneArena</i> = <i>true</i>, and <i>offHeap</i> = <i>true</i>, + * all subsequent allocations will also be off-heap and associated with the original Arena. + * It is the responsibility of the user to close the original Arena using a Try-With-Resource block, or directly.</li> + * + * <li>If the original WritableMemory is off-heap, <i>oneArena</i> is true, and <i>offHeap</i> is false, + * all subsequent allocations will be on-heap. + * It is the responsibility of the user to close the original Arena using a Try-With-Resource block, or directly.</li> + * + * <li>If the original WritableMemory is off-heap, <i>oneArena</i> is false, and <i>offHeap</i> is true, + * all subsequent allocations will also be off-heap and associated with a new confined Arena assigned by this implementation. + * It is the responsibility of the user to close the original Arena using a Try-With-Resource block, or directly, + * and close the last returned new WritableMemory directly.</li> + * </ul> + * + * <p>In summary:</p> + * + * <table> <caption><b>Configuration Options</b></caption> + * <tr><th>Original Off-Heap</th> <th>OneArena</th> <th>OffHeap</th> <th>Subsequent Allocations</th></tr> + * <tr><td>false</td> <td>N/A</td> <td>N/A</td> <td>All on-heap</td></tr> + * <tr><td>true</td> <td>N/A</td> <td>false</td> <td>All on-heap</td></tr> + * <tr><td>true</td> <td>true</td> <td>true</td> <td>All off-heap in original Arena</td></tr> + * <tr><td>true</td> <td>false</td> <td>true</td> <td>All off-heap in separate Arenas</td></tr> + * </table> + * * @author Lee Rhodes */ +@SuppressWarnings("resource") //can't use TWRs here public final class DefaultMemoryRequestServer implements MemoryRequestServer { + private final long alignmentBytes; + private final ByteOrder byteOrder; + private final boolean oneArena; + private final boolean offHeap; /** * Default constructor. */ - public DefaultMemoryRequestServer() { } + public DefaultMemoryRequestServer() { + alignmentBytes = 8; + byteOrder = ByteOrder.nativeOrder(); + oneArena = false; + offHeap = false; + } - @Override - public WritableMemory request( - final long newCapacityBytes, + /** + * Optional constructor 1. + * @param oneArena if true, the original arena will be used for all requested allocations. + * @param offHeap if true, new allocations will be off-heap. + */ + public DefaultMemoryRequestServer( + final boolean oneArena, + final boolean offHeap) { + this.alignmentBytes = 8; + this.byteOrder = ByteOrder.nativeOrder(); + this.oneArena = oneArena; + this.offHeap = offHeap; + } + + /** + * Optional constructor 2. + * @param alignmentBytes requested segment alignment for all allocations. Typically 1, 2, 4 or 8. + * @param byteOrder the given <i>ByteOrder</i>. It must be non-null. + * @param oneArena if true, the same arena will be used for all requested allocations. + * @param offHeap if true, new allocations will be off-heap. + */ + public DefaultMemoryRequestServer( final long alignmentBytes, final ByteOrder byteOrder, - final Arena arena) { - final WritableMemory newWmem; + final boolean oneArena, + final boolean offHeap) { + this.alignmentBytes = alignmentBytes; + this.byteOrder = byteOrder; + this.oneArena = oneArena; + this.offHeap = offHeap; + } - if (arena != null) { - newWmem = WritableMemory.allocateDirect(newCapacityBytes, alignmentBytes, byteOrder, this, arena); - } - else { //On-heap + @Override + public WritableMemory request( + final WritableMemory oldWmem, + final long newCapacityBytes) { + + //On-heap + if (oldWmem.getArena() == null || !offHeap) { if (newCapacityBytes > Integer.MAX_VALUE) { throw new IllegalArgumentException("Requested capacity exceeds Integer.MAX_VALUE."); } - newWmem = WritableMemory.allocate((int)newCapacityBytes, byteOrder, this); + return WritableMemory.allocate((int)newCapacityBytes, byteOrder, this); } - return newWmem; + //Acquire Arena + final Arena arena = (oneArena) ? oldWmem.getArena() : Arena.ofConfined(); + return WritableMemory.allocateDirect(newCapacityBytes, alignmentBytes, byteOrder, this, arena); } @Override - public void requestClose(final Arena arena) { - if (arena.scope().isAlive()) { arena.close(); } + public void requestClose(final WritableMemory wmemToClose) { + final Arena arena = wmemToClose.getArena(); + if (oneArena || arena == null || !arena.scope().isAlive()) { return; } //can't close + arena.close(); } } diff --git a/src/main/java/org/apache/datasketches/memory/Memory.java b/src/main/java/org/apache/datasketches/memory/Memory.java index 071e8167..3ef3d341 100644 --- a/src/main/java/org/apache/datasketches/memory/Memory.java +++ b/src/main/java/org/apache/datasketches/memory/Memory.java @@ -222,7 +222,7 @@ public interface Memory extends Resource { int lengthBytes, ByteOrder byteOrder) { final MemorySegment slice = MemorySegment.ofArray(array).asSlice(offsetBytes, lengthBytes).asReadOnly(); - return WritableMemoryImpl.wrapSegmentAsArray(slice, byteOrder, null); + return WritableMemoryImpl.wrapSegment(slice, byteOrder); } //intentionally removed wrap(boolean[]) @@ -234,7 +234,7 @@ public interface Memory extends Resource { */ static Memory wrap(char[] array) { final MemorySegment seg = MemorySegment.ofArray(array).asReadOnly(); - return WritableMemoryImpl.wrapSegmentAsArray(seg, ByteOrder.nativeOrder(), null); + return WritableMemoryImpl.wrapSegment(seg, ByteOrder.nativeOrder()); } /** @@ -244,7 +244,7 @@ public interface Memory extends Resource { */ static Memory wrap(short[] array) { final MemorySegment seg = MemorySegment.ofArray(array).asReadOnly(); - return WritableMemoryImpl.wrapSegmentAsArray(seg, ByteOrder.nativeOrder(), null); + return WritableMemoryImpl.wrapSegment(seg, ByteOrder.nativeOrder()); } /** @@ -254,7 +254,7 @@ public interface Memory extends Resource { */ static Memory wrap(int[] array) { final MemorySegment seg = MemorySegment.ofArray(array).asReadOnly(); - return WritableMemoryImpl.wrapSegmentAsArray(seg, ByteOrder.nativeOrder(), null); + return WritableMemoryImpl.wrapSegment(seg, ByteOrder.nativeOrder()); } /** @@ -264,7 +264,7 @@ public interface Memory extends Resource { */ static Memory wrap(long[] array) { final MemorySegment seg = MemorySegment.ofArray(array).asReadOnly(); - return WritableMemoryImpl.wrapSegmentAsArray(seg, ByteOrder.nativeOrder(), null); + return WritableMemoryImpl.wrapSegment(seg, ByteOrder.nativeOrder()); } /** @@ -274,7 +274,7 @@ public interface Memory extends Resource { */ static Memory wrap(float[] array) { final MemorySegment seg = MemorySegment.ofArray(array).asReadOnly(); - return WritableMemoryImpl.wrapSegmentAsArray(seg, ByteOrder.nativeOrder(), null); + return WritableMemoryImpl.wrapSegment(seg, ByteOrder.nativeOrder()); } /** @@ -284,7 +284,7 @@ public interface Memory extends Resource { */ static Memory wrap(double[] array) { final MemorySegment seg = MemorySegment.ofArray(array).asReadOnly(); - return WritableMemoryImpl.wrapSegmentAsArray(seg, ByteOrder.nativeOrder(), null); + return WritableMemoryImpl.wrapSegment(seg, ByteOrder.nativeOrder()); } //END OF CONSTRUCTOR-TYPE METHODS diff --git a/src/main/java/org/apache/datasketches/memory/MemoryRequestServer.java b/src/main/java/org/apache/datasketches/memory/MemoryRequestServer.java index e6de60ac..cbeaea9a 100644 --- a/src/main/java/org/apache/datasketches/memory/MemoryRequestServer.java +++ b/src/main/java/org/apache/datasketches/memory/MemoryRequestServer.java @@ -19,9 +19,6 @@ package org.apache.datasketches.memory; -import java.lang.foreign.Arena; -import java.nio.ByteOrder; - /** * The MemoryRequestServer is a callback interface to provide a means to request more or less memory * for heap and off-heap WritableMemory resources that are not file-memory-mapped backed resources. @@ -33,26 +30,21 @@ import java.nio.ByteOrder; public interface MemoryRequestServer { /** - * Request new WritableMemory with the given newCapacityBytes. + * Request a new WritableMemory with the given newCapacityBytes. + * @param oldWmem the previous WritableMemory to be possibly closed and which provides an associated Arena + * that may be used for allocating the new WritableMemory. + * If the arena is null, the requested WritableMemory will be on-heap. * @param newCapacityBytes The capacity being requested. - * @param alignmentBytes requested segment alignment. Typically 1, 2, 4 or 8. - * @param byteOrder the given <i>ByteOrder</i>. It must be non-null. - * @param arena the given arena to manage the new off-heap WritableMemory. - * If arena is null, the requested WritableMemory will be on-heap. - * Warning: This class is not thread-safe. Specifying an Arena that allows multiple threads is not recommended. + * * @return new WritableMemory with the requested capacity. */ - WritableMemory request( - long newCapacityBytes, - long alignmentBytes, - ByteOrder byteOrder, - Arena arena); + WritableMemory request(WritableMemory oldWmem, long newCapacityBytes); /** - * Request to close the area managing all the related resources, if applicable. - * Be careful when you request to close the given Arena, you may be closing other resources as well. - * @param arena the given arena to use to close all its managed resources. + * Request to close the given WritableMemory. If applicable, it will be closed by its associated Arena. + * Be careful. Closing the associated Arena may be closing other resources as well. + * @param wmemToClose the given WritableMemory to close. */ - void requestClose( Arena arena); + void requestClose(WritableMemory wmemToClose); } diff --git a/src/main/java/org/apache/datasketches/memory/Resource.java b/src/main/java/org/apache/datasketches/memory/Resource.java index 701b31ec..9740b4b0 100644 --- a/src/main/java/org/apache/datasketches/memory/Resource.java +++ b/src/main/java/org/apache/datasketches/memory/Resource.java @@ -36,8 +36,9 @@ public interface Resource { /** * The default MemoryRequestServer used primarily by test. + * Do not allocate requested memory off-heap. */ - static final MemoryRequestServer defaultMemReqSvr = new DefaultMemoryRequestServer(); + static final MemoryRequestServer defaultMemReqSvr = new DefaultMemoryRequestServer(8, ByteOrder.nativeOrder(), false, false); /** * Gets the {@link MemoryRequestServer} to request additional memory @@ -253,9 +254,10 @@ public interface Resource { boolean isRegion(); /** - * Returns true if the underlying resource is the same underlying resource as <i>that</i>. + * Returns true if the backing resource of <i>this</i> is the same as the backing resource of <i>that</i>. + * This returns false if <i>this</i> and <i>that</i> are both on-heap and one of them is read-only. * @param that the other Resource object - * @return a long value representing the ordering and size of overlap between <i>this</i> and <i>that</i> + * @return true if the backing resource of <i>this</i> is the same as the backing resource of <i>that</i>. */ boolean isSameResource(Resource that); @@ -322,6 +324,7 @@ mismatch(MemorySegment, long, long, MemorySegment, long, long)</a> * @param arena the given arena. * If the desired result is to be off-heap, the arena must not be null. * Otherwise, the result will be on-heap. + * Warning: This class is not thread-safe. Specifying an Arena that allows multiple threads is not recommended. * @param alignment requested segment alignment. Typically 1, 2, 4 or 8. * @return a copy of the underlying MemorySegment in the given arena. */ diff --git a/src/main/java/org/apache/datasketches/memory/WritableMemory.java b/src/main/java/org/apache/datasketches/memory/WritableMemory.java index 23d0ba00..69656ac1 100644 --- a/src/main/java/org/apache/datasketches/memory/WritableMemory.java +++ b/src/main/java/org/apache/datasketches/memory/WritableMemory.java @@ -117,11 +117,10 @@ public interface WritableMemory extends Memory { * Allocates and provides access to capacityBytes directly in native (off-heap) memory. * The allocated memory will be 8-byte aligned. * Native byte order is assumed. - * A new DefaultMemoryRequestServer() is created. + * A new DefaultMemoryRequestServer() is created, which allocates on-heap. * - * <p><b>NOTE:</b> Native/Direct memory acquired may have garbage in it. - * It is the responsibility of the using application to clear this memory, if required, - * and to call <i>close()</i> when done.</p> + * <p><b>NOTE:</b>It is the responsibility of the using application to call + * <i>WritableMemory::getArena().close()</i> when done.</p> * @param capacityBytes the size of the desired memory in bytes. * @param arena the given arena to manage the new off-heap WritableMemory. It must be non-null. * Warning: This class is not thread-safe. Specifying an Arena that allows multiple threads is not recommended. @@ -132,6 +131,25 @@ public interface WritableMemory extends Memory { return allocateDirect(capacityBytes, 8, ByteOrder.nativeOrder(), new DefaultMemoryRequestServer(), arena); } + /** + * Allocates and provides access to capacityBytes directly in native (off-heap) memory. + * The allocated memory will be 8-byte aligned. + * Native byte order is assumed. + * + * <p><b>NOTE:</b>It is the responsibility of the using application to call + * <i>WritableMemory::getArena().close()</i> when done.</p> + * @param capacityBytes the size of the desired memory in bytes. + * @param memReqSvr A user-specified MemoryRequestServer, which may be null. + * This is a callback mechanism for a user client of direct memory to request more memory. + * @param arena the given arena to manage the new off-heap WritableMemory. It must be non-null. + * Warning: This class is not thread-safe. Specifying an Arena that allows multiple threads is not recommended. + * + * @return a WritableMemory for this off-heap resource. + */ + static WritableMemory allocateDirect(long capacityBytes, MemoryRequestServer memReqSvr, Arena arena) { + return allocateDirect(capacityBytes, 8, ByteOrder.nativeOrder(), memReqSvr, arena); + } + /** * Allocates and provides access to capacityBytes directly in native (off-heap) memory. * The allocated memory will be aligned to the given <i>alignmentBytes</i>. @@ -340,7 +358,7 @@ public interface WritableMemory extends Memory { ByteOrder byteOrder, MemoryRequestServer memReqSvr) { final MemorySegment slice = MemorySegment.ofArray(array).asSlice(offsetBytes, lengthBytes); - return WritableMemoryImpl.wrapSegmentAsArray(slice, byteOrder, memReqSvr); + return WritableMemoryImpl.wrapSegment(slice, byteOrder, memReqSvr); } //intentionally removed writableWrap(boolean[]) @@ -352,7 +370,7 @@ public interface WritableMemory extends Memory { */ static WritableMemory writableWrap(char[] array) { final MemorySegment seg = MemorySegment.ofArray(array); - return WritableMemoryImpl.wrapSegmentAsArray(seg, ByteOrder.nativeOrder(), null); + return WritableMemoryImpl.wrapSegment(seg, ByteOrder.nativeOrder()); } /** @@ -362,7 +380,7 @@ public interface WritableMemory extends Memory { */ static WritableMemory writableWrap(short[] array) { final MemorySegment seg = MemorySegment.ofArray(array); - return WritableMemoryImpl.wrapSegmentAsArray(seg, ByteOrder.nativeOrder(), null); + return WritableMemoryImpl.wrapSegment(seg, ByteOrder.nativeOrder()); } /** @@ -372,7 +390,7 @@ public interface WritableMemory extends Memory { */ static WritableMemory writableWrap(int[] array) { final MemorySegment seg = MemorySegment.ofArray(array); - return WritableMemoryImpl.wrapSegmentAsArray(seg, ByteOrder.nativeOrder(), null); + return WritableMemoryImpl.wrapSegment(seg, ByteOrder.nativeOrder()); } /** @@ -382,7 +400,7 @@ public interface WritableMemory extends Memory { */ static WritableMemory writableWrap(long[] array) { final MemorySegment seg = MemorySegment.ofArray(array); - return WritableMemoryImpl.wrapSegmentAsArray(seg, ByteOrder.nativeOrder(), null); + return WritableMemoryImpl.wrapSegment(seg, ByteOrder.nativeOrder()); } /** @@ -392,7 +410,7 @@ public interface WritableMemory extends Memory { */ static WritableMemory writableWrap(float[] array) { final MemorySegment seg = MemorySegment.ofArray(array); - return WritableMemoryImpl.wrapSegmentAsArray(seg, ByteOrder.nativeOrder(), null); + return WritableMemoryImpl.wrapSegment(seg, ByteOrder.nativeOrder()); } /** @@ -402,7 +420,7 @@ public interface WritableMemory extends Memory { */ static WritableMemory writableWrap(double[] array) { final MemorySegment seg = MemorySegment.ofArray(array); - return WritableMemoryImpl.wrapSegmentAsArray(seg, ByteOrder.nativeOrder(), null); + return WritableMemoryImpl.wrapSegment(seg, ByteOrder.nativeOrder()); } //END OF CONSTRUCTOR-TYPE METHODS diff --git a/src/main/java/org/apache/datasketches/memory/internal/ResourceImpl.java b/src/main/java/org/apache/datasketches/memory/internal/ResourceImpl.java index 459af659..d4030b47 100644 --- a/src/main/java/org/apache/datasketches/memory/internal/ResourceImpl.java +++ b/src/main/java/org/apache/datasketches/memory/internal/ResourceImpl.java @@ -453,10 +453,13 @@ abstract class ResourceImpl implements Resource { @Override public final boolean isSameResource(final Resource that) { - Objects.requireNonNull(that); + if (that == null) { return false; } final ResourceImpl that2 = (ResourceImpl) that; - return this.seg.address() == that2.seg.address() - && this.seg.byteSize() == that2.seg.byteSize(); + if (this.arena == null && that2.arena == null) { //both on heap + if (this.seg.isReadOnly() || that2.seg.isReadOnly()) { return false; } + return this.seg.heapBase().get() == that2.seg.heapBase().get(); + } + return this.seg.address() == that2.seg.address(); } @Override diff --git a/src/main/java/org/apache/datasketches/memory/internal/WritableMemoryImpl.java b/src/main/java/org/apache/datasketches/memory/internal/WritableMemoryImpl.java index 4604e092..339c434b 100644 --- a/src/main/java/org/apache/datasketches/memory/internal/WritableMemoryImpl.java +++ b/src/main/java/org/apache/datasketches/memory/internal/WritableMemoryImpl.java @@ -57,16 +57,35 @@ public abstract class WritableMemoryImpl extends ResourceImpl implements Writabl super(seg, typeId, memReqSvr, arena); } - //WRAP HEAP ARRAY RESOURCE + //WRAP SEGMENT RESOURCE /** - * Wrap a <i>MemorySegment</i> as an array + * Wrap a <i>MemorySegment</i>. + * @param seg the given <i>MemorySegment</i>. It must be non-null. + * @param byteOrder the given <i>ByteOrder</i>. It must be non-null. + * @return a <i>WritableMemory</i>. + */ + public static WritableMemory wrapSegment( + final MemorySegment seg, + final ByteOrder byteOrder) { + Objects.requireNonNull(byteOrder, "byteOrder must be non-null"); + int type = MEMORY + | (seg.isReadOnly() ? READONLY : 0); + if (byteOrder == NON_NATIVE_BYTE_ORDER) { + type |= NONNATIVE_BO; + return new NonNativeWritableMemoryImpl(seg, type, null, null); + } + return new NativeWritableMemoryImpl(seg, type, null, null); + } + + /** + * Wrap a <i>MemorySegment</i>. * @param seg the given <i>MemorySegment</i>. It must be non-null. * @param byteOrder the given <i>ByteOrder</i>. It must be non-null. * @param memReqSvr the given <i>MemoryRequestServer</i>. It may be null. * @return a <i>WritableMemory</i>. */ - public static WritableMemory wrapSegmentAsArray( + public static WritableMemory wrapSegment( final MemorySegment seg, final ByteOrder byteOrder, final MemoryRequestServer memReqSvr) { diff --git a/src/test/java/org/apache/datasketches/memory/internal/AllocateDirectMemoryTest.java b/src/test/java/org/apache/datasketches/memory/internal/AllocateDirectMemoryTest.java index dd0049e2..135d236f 100644 --- a/src/test/java/org/apache/datasketches/memory/internal/AllocateDirectMemoryTest.java +++ b/src/test/java/org/apache/datasketches/memory/internal/AllocateDirectMemoryTest.java @@ -27,6 +27,7 @@ import static org.testng.Assert.assertTrue; import java.lang.foreign.Arena; import java.nio.ByteOrder; +import org.apache.datasketches.memory.DefaultMemoryRequestServer; import org.apache.datasketches.memory.MemoryRequestServer; import org.apache.datasketches.memory.Resource; import org.apache.datasketches.memory.WritableMemory; @@ -56,28 +57,76 @@ public class AllocateDirectMemoryTest { @Test public void checkDefaultMemoryRequestServer() { + boolean oneArena = false; + boolean offHeap = false; + checkDefaultMemoryRequestServerVariations(false, false, false); + checkDefaultMemoryRequestServerVariations(false, false, true); + checkDefaultMemoryRequestServerVariations(false, true, false); + checkDefaultMemoryRequestServerVariations(false, true, true); + checkDefaultMemoryRequestServerVariations(true, false, false); + checkDefaultMemoryRequestServerVariations(true, false, true); + checkDefaultMemoryRequestServerVariations(true, true, false); + checkDefaultMemoryRequestServerVariations(true, true, true); + } + + private void checkDefaultMemoryRequestServerVariations(boolean origArena, boolean oneArena, boolean offHeap) { int longs1 = 32; int bytes1 = longs1 << 3; - try (Arena arena = Arena.ofConfined()) { - WritableMemory origWmem = WritableMemory.allocateDirect(bytes1, 8, ByteOrder.LITTLE_ENDIAN, Resource.defaultMemReqSvr, arena); + WritableMemory origWmem, newWmem; + + if (origArena) { + MemoryRequestServer dmrs = new DefaultMemoryRequestServer(8, ByteOrder.nativeOrder(), oneArena, offHeap); + try (Arena arena = Arena.ofConfined()) { + origWmem = WritableMemory.allocateDirect(bytes1, 8, ByteOrder.LITTLE_ENDIAN, dmrs, arena); + assertTrue(origWmem.isDirect()); + for (int i = 0; i < longs1; i++) { //puts data in origWmem + origWmem.putLong(i << 3, i); + assertEquals(origWmem.getLong(i << 3), i); + } + println(origWmem.toString("Test", 0, longs1 << 3, true)); + + int longs2 = 2 * longs1; + int bytes2 = longs2 << 3; + MemoryRequestServer myMemReqSvr = origWmem.getMemoryRequestServer(); + + newWmem = myMemReqSvr.request(origWmem, bytes2); + assertTrue( (offHeap && origArena) ? newWmem.isDirect() : newWmem.isHeap() ); + for (int i = 0; i < longs2; i++) { + newWmem.putLong(i << 3, i); + assertEquals(newWmem.getLong(i << 3), i); + } + println(newWmem.toString("Test", 0, longs2 << 3, true)); + if (oneArena && offHeap) { assertTrue((newWmem.getArena() == origWmem.getArena()) && origWmem != null); } + if (oneArena && !offHeap) { assertTrue((newWmem.getArena() == null) && origWmem != null); } + if (!oneArena && offHeap) { assertTrue((newWmem.getArena() != origWmem.getArena()) && origWmem != null); } + } //allow the TWR to close the origWmem resource + assertFalse(origWmem.getArena().scope().isAlive()); + if (!oneArena && offHeap) { + newWmem.getArena().close(); + assertFalse(newWmem.getArena().scope().isAlive()); + } + + } else { + MemoryRequestServer dmrs = new DefaultMemoryRequestServer(8, ByteOrder.nativeOrder(), oneArena, offHeap); + origWmem = WritableMemory.allocate(bytes1,ByteOrder.LITTLE_ENDIAN, dmrs); for (int i = 0; i < longs1; i++) { //puts data in origWmem origWmem.putLong(i << 3, i); assertEquals(origWmem.getLong(i << 3), i); } - println(origWmem.toString("Test", 0, 32 * 8, true)); + println(origWmem.toString("Test", 0, longs1 << 3, true)); - int longs2 = 64; + int longs2 = 2 * longs1; int bytes2 = longs2 << 3; - origWmem.setMemoryRequestServer(Resource.defaultMemReqSvr); MemoryRequestServer myMemReqSvr = origWmem.getMemoryRequestServer(); - WritableMemory newWmem = myMemReqSvr.request(bytes2, 8, ByteOrder.LITTLE_ENDIAN, null); //null -> on-heap - assertTrue(newWmem.isHeap()); + newWmem = myMemReqSvr.request(origWmem, bytes2); + assertTrue( (offHeap && origArena) ? newWmem.isDirect() : newWmem.isHeap() ); for (int i = 0; i < longs2; i++) { newWmem.putLong(i << 3, i); assertEquals(newWmem.getLong(i << 3), i); } - } //allow the TWR to close all resources + println(newWmem.toString("Test", 0, longs2 << 3, true)); + } } @Test diff --git a/src/test/java/org/apache/datasketches/memory/internal/DruidIssue11544Test.java b/src/test/java/org/apache/datasketches/memory/internal/DruidIssue11544Test.java index a9e4d631..ea116a82 100644 --- a/src/test/java/org/apache/datasketches/memory/internal/DruidIssue11544Test.java +++ b/src/test/java/org/apache/datasketches/memory/internal/DruidIssue11544Test.java @@ -59,26 +59,26 @@ public class DruidIssue11544Test { //Wrap bb into WritableMemory WritableMemory mem1 = WritableMemory.writableWrap(bb); - //ByteBuffers are automatically assigned an implicit shared scope (non-closeable) + //ByteBuffers are not directly closeable. They are closed by the GC. assertTrue(mem1.isDirect()); //confirm mem1 is off-heap - + assertTrue(mem1.getArena() == null); //and Arena is null //Request Bigger Memory on heap int size2 = size1 * 2; - WritableMemory mem2 = myMemReqSvr.request(size2, 8, ByteOrder.LITTLE_ENDIAN, null); + WritableMemory mem2 = myMemReqSvr.request(mem1, size2); - //Confirm that mem2 is on the heap (the default) and 2X size1 + //Confirm that mem2 is on the heap and 2X size1 assertFalse(mem2.isDirect()); assertEquals(mem2.getCapacity(), size2); //Move data to new memory mem1.copyTo(0, mem2, 0, size1); - assertTrue(mem1.isAlive()); + assertTrue(mem1.isAlive()); //because mem1 seg is holding a reference to it. assertTrue(mem2.isAlive()); //Now we are on the heap and need to grow again: int size3 = size2 * 2; - WritableMemory mem3 = myMemReqSvr.request(size3, 8, ByteOrder.LITTLE_ENDIAN, null); + WritableMemory mem3 = myMemReqSvr.request(mem2, size3); //Confirm that mem3 is still on the heap and 2X of size2 assertFalse(mem3.isDirect()); diff --git a/src/test/java/org/apache/datasketches/memory/internal/ExampleMemoryRequestServerTest.java b/src/test/java/org/apache/datasketches/memory/internal/ExampleMemoryRequestServerTest.java index 55dd7884..d24005f0 100644 --- a/src/test/java/org/apache/datasketches/memory/internal/ExampleMemoryRequestServerTest.java +++ b/src/test/java/org/apache/datasketches/memory/internal/ExampleMemoryRequestServerTest.java @@ -24,6 +24,7 @@ import java.nio.ByteOrder; import org.apache.datasketches.memory.DefaultMemoryRequestServer; import org.apache.datasketches.memory.MemoryRequestServer; +import org.apache.datasketches.memory.Resource; import org.apache.datasketches.memory.WritableMemory; import org.testng.annotations.Test; @@ -35,7 +36,6 @@ import org.testng.annotations.Test; * @author Lee Rhodes */ public class ExampleMemoryRequestServerTest { - private static long alignmentBytes = 8; /** * This version is without a TWR block. All of the memory allocations are done through the MemoryRequestServer @@ -48,15 +48,13 @@ public class ExampleMemoryRequestServerTest { long workingMemBytes = 8; Arena arena = Arena.ofConfined(); - //Configure the default memReqSvr to create new memory off-heap and copy data from old to new - MemoryRequestServer memReqSvr = new DefaultMemoryRequestServer(); + //Configure the memReqSvr to create new subsequent allocations off-heap, each with a new Arena. + MemoryRequestServer myMemReqSvr = new DefaultMemoryRequestServer(8, ByteOrder.nativeOrder(), false, true); //Create the initial working memory for the client WritableMemory workingMem = WritableMemory.allocateDirect( workingMemBytes, - alignmentBytes, - ByteOrder.nativeOrder(), - memReqSvr, + myMemReqSvr, arena); MemoryHungryClient client = new MemoryHungryClient(workingMem); @@ -95,14 +93,14 @@ public class ExampleMemoryRequestServerTest { oldWorkingCap = newWorkingCap; newWorkingCap = 2 * oldWorkingCap; Arena arena = Arena.ofConfined(); // new confined scope for each iteration - newMem = memReqSvr.request(newWorkingCap, alignmentBytes, ByteOrder.LITTLE_ENDIAN, arena); + newMem = memReqSvr.request(workingMem, newWorkingCap); //done with old memory, close it - memReqSvr.requestClose(workingMem.getArena()); + memReqSvr.requestClose(workingMem); workingMem = newMem; itr++; } - + //close the last allocation workingMem.getArena().close(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
