This is an automated email from the ASF dual-hosted git repository.
guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 81cab8f4869 [FLINK-32213][core] Add get off heap buffer in segment
81cab8f4869 is described below
commit 81cab8f486997ef666128cce4903c24d44ac7534
Author: Shammon FY <[email protected]>
AuthorDate: Tue Jun 6 10:18:14 2023 +0800
[FLINK-32213][core] Add get off heap buffer in segment
This closes #22675
---
.../org/apache/flink/core/memory/MemorySegment.java | 14 ++++++++++++++
.../runtime/memory/MemorySegmentSimpleTest.java | 20 ++++++++++++++++++++
2 files changed, 34 insertions(+)
diff --git
a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
index ce39a122654..93e81fc5be8 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
@@ -279,6 +279,20 @@ public final class MemorySegment {
}
}
+ /**
+ * Returns the off-heap buffer of memory segments.
+ *
+ * @return underlying off-heap buffer
+ * @throws IllegalStateException if the memory segment does not represent
off-heap buffer
+ */
+ public ByteBuffer getOffHeapBuffer() {
+ if (offHeapBuffer != null) {
+ return offHeapBuffer;
+ } else {
+ throw new IllegalStateException("Memory segment does not represent
off-heap buffer");
+ }
+ }
+
/**
* Returns the memory address of off-heap memory segments.
*
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java
index 8301a355862..d2a8da32f1c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java
@@ -34,6 +34,8 @@ import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
/** Test reading and writing primitive types to {@link MemorySegment}. */
public class MemorySegmentSimpleTest {
@@ -574,4 +576,22 @@ public class MemorySegmentSimpleTest {
Assert.fail(e.getMessage());
}
}
+
+ @Test
+ public void testGetOffHeapBuffer() {
+ MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(1024);
+ assertThrows(
+ IllegalStateException.class,
+ seg::getOffHeapBuffer,
+ "Memory segment does not represent off-heap buffer");
+ seg.free();
+
+ seg = MemorySegmentFactory.allocateUnpooledOffHeapMemory(1024);
+ assertNotNull(seg.getOffHeapBuffer());
+ seg.free();
+
+ seg = MemorySegmentFactory.allocateOffHeapUnsafeMemory(1024);
+ assertNotNull(seg.getOffHeapBuffer());
+ seg.free();
+ }
}