KarmaGYZ commented on a change in pull request #15258:
URL: https://github.com/apache/flink/pull/15258#discussion_r596706763



##########
File path: 
flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
##########
@@ -99,72 +99,106 @@
      * segment will point to undefined addresses outside the heap and may in 
out-of-order execution
      * cases cause segmentation faults.
      */
-    protected final byte[] heapMemory;
+    @Nullable private final byte[] heapMemory;
+
+    /**
+     * The direct byte buffer that wraps the off-heap memory. This memory 
segment holds a reference
+     * to that buffer, so as long as this memory segment lives, the memory 
will not be released.
+     */
+    @Nullable private ByteBuffer offHeapBuffer;
 
     /**
      * The address to the data, relative to the heap memory byte array. If the 
heap memory byte
      * array is <tt>null</tt>, this becomes an absolute memory address outside 
the heap.
      */
-    protected long address;
+    private long address;
 
     /**
      * The address one byte after the last addressable byte, i.e. <tt>address 
+ size</tt> while the
      * segment is not disposed.
      */
-    protected final long addressLimit;
+    private final long addressLimit;
 
     /** The size in bytes of the memory segment. */
-    protected final int size;
+    private final int size;
 
     /** Optional owner of the memory segment. */
-    private final Object owner;
+    @Nullable private final Object owner;

Review comment:
       Also annotate the `getOwner` with @Nullable.

##########
File path: 
flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
##########
@@ -254,7 +293,32 @@ public long getAddress() {
      * @throws IndexOutOfBoundsException Thrown, if offset is negative or 
larger than the memory
      *     segment size, or if the offset plus the length is larger than the 
segment size.
      */
-    public abstract ByteBuffer wrap(int offset, int length);
+    public ByteBuffer wrap(int offset, int length) {
+        if (!allowWrap) {
+            throw new UnsupportedOperationException(
+                    "Wrap is not supported by this segment. This usually 
indicates that the underlying memory is unsafe, thus transferring of ownership 
is not allowed.");
+        }
+        return wrapInternal(offset, length);
+    }
+
+    private ByteBuffer wrapInternal(int offset, int length) {
+        if (address <= addressLimit) {
+            if (heapMemory != null) {
+                return ByteBuffer.wrap(heapMemory, offset, length);
+            } else {
+                try {
+                    ByteBuffer wrapper = offHeapBuffer.duplicate();

Review comment:
       `Preconditions.checkNotNull(offHeapBuffer).duplicate()`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to