carp84 commented on a change in pull request #10447: [FLINK-15084][runtime] Add
shared resources tracking to MemoryManager
URL: https://github.com/apache/flink/pull/10447#discussion_r354437363
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
##########
@@ -535,6 +541,103 @@ public void releaseAllMemory(Object owner, MemoryType
memoryType) {
});
}
+ //
------------------------------------------------------------------------
+ // Shared opaque memory resources
+ //
------------------------------------------------------------------------
+
+ /**
+ * Acquires a shared memory resource, that uses all the memory of this
memory manager.
+ * This method behaves otherwise exactly as {@link
#getSharedMemoryResourceForManagedMemory(String, LongFunctionWithException,
double)}.
+ */
+ public <T extends AutoCloseable> OpaqueMemoryResource<T>
getSharedMemoryResourceForManagedMemory(
+ String type,
+ LongFunctionWithException<T, Exception> initializer)
throws Exception {
+
+ return getSharedMemoryResourceForManagedMemory(type,
initializer, 1.0);
+ }
+
+ /**
+ * Acquires a shared memory resource, identified by a type string. If
the resource already exists, this
+ * returns a descriptor to the resource. If the resource does not yet
exist, the given memory fraction
+ * is reserved and the resource is initialized with that size.
+ *
+ * <p>The memory for the resource is reserved from the memory budget of
this memory manager (thus
+ * determining the size of the resource), but resource itself is
opaque, meaning the memory manager
+ * does not understand its structure.
+ *
+ * <p>The OpaqueMemoryResource object returned from this method must be
closed once not used any further.
+ * Once all acquisitions have closed the object, the resource itself is
closed.
+ */
+ public <T extends AutoCloseable> OpaqueMemoryResource<T>
getSharedMemoryResourceForManagedMemory(
+ String type,
+ LongFunctionWithException<T, Exception> initializer,
+ double fractionToInitializeWith) throws Exception {
+
+ // if we need to allocate the resource (no shared resource
allocated, yet), this would be the size to use
+ final long numBytes =
computeMemorySize(fractionToInitializeWith);
+
+ // the initializer attempt to reserve the memory before actual
initialization
+ final LongFunctionWithException<T, Exception>
reserveAndInitialize = (size) -> {
+ try {
+ reserveMemory(type, MemoryType.OFF_HEAP, size);
+ } catch (MemoryReservationException e) {
+ throw new MemoryAllocationException("Could not
created the shared memory resource of size " + size +
+ ". Not enough memory left to reserve
from the slot's managed memory.", e);
+ }
+
+ return initializer.apply(size);
+ };
+
+ // This object identifies the lease in this request. It is used
only to identify the release operation.
+ // Using the object to represent the lease is a bit nicer safer
than just using a reference counter.
+ final Object leaseHolder = new Object();
+
+ final SharedResources.ResourceAndSize<T> resource =
+
sharedResources.getOrAllocateSharedResource(type, leaseHolder,
reserveAndInitialize, numBytes);
+
+ // the actual size may theoretically be different from what we
requested, if allocated it was by
+ // someone else before with a different value for fraction
(should not happen in practice, though).
+ final long size = resource.size();
+
+ final ThrowingRunnable<Exception> disposer = () -> {
+ final boolean allDisposed =
sharedResources.release(type, leaseHolder);
+ if (allDisposed) {
+ releaseMemory(type,
MemoryType.OFF_HEAP, size);
+ }
+ };
+
+ return new OpaqueMemoryResource<>(resource.resourceHandle(),
size, disposer);
+ }
+
+ /**
+ * Acquires a shared memory resource, identified by a type string. If
the resource already exists, this
+ * returns a descriptor to the resource. If the resource does not yet
exist, the given memory fraction
+ * is reserved and the resource is initialized with that size.
+ *
+ * <p>The memory for the resource is reserved from the memory budget of
this memory manager (thus
+ * determining the size of the resource), but resource itself is
opaque, meaning the memory manager
+ * does not understand its structure.
+ *
+ * <p>The OpaqueMemoryResource object returned from this method must be
closed once not used any further.
+ * Once all acquisitions have closed the object, the resource itself is
closed.
Review comment:
This javadoc seems to be a copy from
`getSharedMemoryResourceForManagedMemory` and need to be refined.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services