This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 492d1d4 [FLINK-11724][core] Add copyToUnsafe, copyFromUnsafe and
equalTo to MemorySegment.
492d1d4 is described below
commit 492d1d40100b1297281408bf0d83b6db5378b9cb
Author: JingsongLi <[email protected]>
AuthorDate: Wed Feb 27 19:37:52 2019 +0800
[FLINK-11724][core] Add copyToUnsafe, copyFromUnsafe and equalTo to
MemorySegment.
This closes #7847
---
.../apache/flink/core/memory/MemorySegment.java | 77 ++++++++++++++++++++++
.../flink/core/memory/CrossSegmentTypeTest.java | 10 +++
.../flink/core/memory/MemorySegmentTestBase.java | 35 ++++++++++
3 files changed, 122 insertions(+)
diff --git
a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
index b95ceb9..39b6d9c 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
@@ -1270,6 +1270,50 @@ public abstract class MemorySegment {
}
}
+ /**
+ * Bulk copy method. Copies {@code numBytes} bytes to target unsafe
object and pointer.
+ * NOTE: This is a unsafe method, no check here, please be carefully.
+ *
+ * @param offset The position where the bytes are started to be read
from in this memory segment.
+ * @param target The unsafe memory to copy the bytes to.
+ * @param targetPointer The position in the target unsafe memory to
copy the chunk to.
+ * @param numBytes The number of bytes to copy.
+ *
+ * @throws IndexOutOfBoundsException If the source segment does not
contain the given number
+ * of bytes (starting from offset).
+ */
+ public final void copyToUnsafe(int offset, Object target, int
targetPointer, int numBytes) {
+ final long thisPointer = this.address + offset;
+ if (thisPointer + numBytes > addressLimit) {
+ throw new IndexOutOfBoundsException(
+ String.format("offset=%d, numBytes=%d,
address=%d",
+ offset, numBytes,
this.address));
+ }
+ UNSAFE.copyMemory(this.heapMemory, thisPointer, target,
targetPointer, numBytes);
+ }
+
+ /**
+ * Bulk copy method. Copies {@code numBytes} bytes from source unsafe
object and pointer.
+ * NOTE: This is a unsafe method, no check here, please be carefully.
+ *
+ * @param offset The position where the bytes are started to be write
in this memory segment.
+ * @param source The unsafe memory to copy the bytes from.
+ * @param sourcePointer The position in the source unsafe memory to
copy the chunk from.
+ * @param numBytes The number of bytes to copy.
+ *
+ * @throws IndexOutOfBoundsException If this segment can not contain
the given number
+ * of bytes (starting from offset).
+ */
+ public final void copyFromUnsafe(int offset, Object source, int
sourcePointer, int numBytes) {
+ final long thisPointer = this.address + offset;
+ if (thisPointer + numBytes > addressLimit) {
+ throw new IndexOutOfBoundsException(
+ String.format("offset=%d, numBytes=%d,
address=%d",
+ offset, numBytes,
this.address));
+ }
+ UNSAFE.copyMemory(source, sourcePointer, this.heapMemory,
thisPointer, numBytes);
+ }
+
//
-------------------------------------------------------------------------
// Comparisons & Swapping
//
-------------------------------------------------------------------------
@@ -1349,4 +1393,37 @@ public abstract class MemorySegment {
String.format("offset1=%d, offset2=%d,
len=%d, bufferSize=%d, address1=%d, address2=%d",
offset1, offset2, len,
tempBuffer.length, this.address, seg2.address));
}
+
+ /**
+ * Equals two memory segment regions.
+ *
+ * @param seg2 Segment to equal this segment with
+ * @param offset1 Offset of this segment to start equaling
+ * @param offset2 Offset of seg2 to start equaling
+ * @param length Length of the equaled memory region
+ *
+ * @return true if equal, false otherwise
+ */
+ public final boolean equalTo(MemorySegment seg2, int offset1, int
offset2, int length) {
+ int i = 0;
+
+ // we assume unaligned accesses are supported.
+ // Compare 8 bytes at a time.
+ while (i <= length - 8) {
+ if (getLong(offset1 + i) != seg2.getLong(offset2 + i)) {
+ return false;
+ }
+ i += 8;
+ }
+
+ // cover the last (length % 8) elements.
+ while (i < length) {
+ if (get(offset1 + i) != seg2.get(offset2 + i)) {
+ return false;
+ }
+ i += 1;
+ }
+
+ return true;
+ }
}
diff --git
a/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java
b/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java
index ea144c7..ccff2ba 100644
---
a/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java
+++
b/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java
@@ -35,6 +35,8 @@ import static org.junit.Assert.fail;
*/
public class CrossSegmentTypeTest {
+ private static final long BYTE_ARRAY_BASE_OFFSET =
MemoryUtils.UNSAFE.arrayBaseOffset(byte[].class);
+
private final int pageSize = 32 * 1024;
//
------------------------------------------------------------------------
@@ -187,6 +189,8 @@ public class CrossSegmentTypeTest {
byte[] expected = new byte[pageSize];
byte[] actual = new byte[pageSize];
+ byte[] unsafeCopy = new byte[pageSize];
+ MemorySegment unsafeCopySeg =
MemorySegmentFactory.allocateUnpooledSegment(pageSize);
// zero out the memory
seg1.put(0, expected);
@@ -205,6 +209,12 @@ public class CrossSegmentTypeTest {
seg1.put(thisPos, bytes);
seg1.copyTo(thisPos, seg2, otherPos, numBytes);
+ seg1.copyToUnsafe(thisPos, unsafeCopy, (int) (otherPos
+ BYTE_ARRAY_BASE_OFFSET), numBytes);
+
+ int otherPos2 = random.nextInt(pageSize - numBytes);
+ unsafeCopySeg.copyFromUnsafe(otherPos2, unsafeCopy,
+ (int) (otherPos +
BYTE_ARRAY_BASE_OFFSET), numBytes);
+ assertTrue(unsafeCopySeg.equalTo(seg2, otherPos2,
otherPos, numBytes));
}
seg2.get(0, actual);
diff --git
a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentTestBase.java
b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentTestBase.java
index fb28948..0b8f1d0 100644
---
a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentTestBase.java
+++
b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentTestBase.java
@@ -304,6 +304,41 @@ public abstract class MemorySegmentTestBase {
}
@Test
+ public void testCopyUnsafeIndexOutOfBounds() {
+ byte[] bytes = new byte[pageSize];
+ MemorySegment segment = createSegment(pageSize);
+
+ try {
+ segment.copyToUnsafe(1, bytes, 0, pageSize);
+ fail("should fail with an IndexOutOfBoundsException");
+ }
+ catch (IndexOutOfBoundsException ignored) {}
+
+ try {
+ segment.copyFromUnsafe(1, bytes, 0, pageSize);
+ fail("should fail with an IndexOutOfBoundsException");
+ }
+ catch (IndexOutOfBoundsException ignored) {}
+ }
+
+ @Test
+ public void testEqualTo() {
+ MemorySegment seg1 = createSegment(pageSize);
+ MemorySegment seg2 = createSegment(pageSize);
+
+ int i = new Random().nextInt(pageSize - 8);
+
+ seg1.put(i, (byte) 10);
+ assertFalse(seg1.equalTo(seg2, i, i, 9));
+
+ seg1.put(i, (byte) 0);
+ assertTrue(seg1.equalTo(seg2, i, i, 9));
+
+ seg1.put(i + 8, (byte) 10);
+ assertFalse(seg1.equalTo(seg2, i, i, 9));
+ }
+
+ @Test
public void testCharAccess() {
final MemorySegment segment = createSegment(pageSize);