http://git-wip-us.apache.org/repos/asf/flink/blob/75a52574/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/segments/PureHeapMemorySegment.java ---------------------------------------------------------------------- diff --git a/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/segments/PureHeapMemorySegment.java b/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/segments/PureHeapMemorySegment.java new file mode 100644 index 0000000..f36fcd9 --- /dev/null +++ b/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/segments/PureHeapMemorySegment.java @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.benchmark.core.memory.segments; + +import org.apache.flink.core.memory.MemoryUtils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +public final class PureHeapMemorySegment { + + /** Constant that flags the byte order. Because this is a boolean constant, + * the JIT compiler can use this well to aggressively eliminate the non-applicable code paths */ + private static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN); + + /** The array in which the data is stored. */ + private byte[] memory; + + /** Wrapper for I/O requests. */ + private ByteBuffer wrapper; + + /** The size, stored extra, because we may clear the reference to the byte array */ + private final int size; + + // ------------------------------------------------------------------------- + // Constructors + // ------------------------------------------------------------------------- + + /** + * Creates a new memory segment that represents the data in the given byte array. + * + * @param memory The byte array that holds the data. + */ + public PureHeapMemorySegment(byte[] memory) { + this.memory = memory; + this.size = memory.length; + } + + // ------------------------------------------------------------------------- + // Direct Memory Segment Specifics + // ------------------------------------------------------------------------- + + /** + * Gets the byte array that backs this memory segment. + * + * @return The byte array that backs this memory segment. + */ + public byte[] getArray() { + return this.memory; + } + + // ------------------------------------------------------------------------- + // MemorySegment Accessors + // ------------------------------------------------------------------------- + + public final boolean isFreed() { + return this.memory == null; + } + + public final void free() { + this.wrapper = null; + this.memory = null; + } + + public final int size() { + return this.size; + } + + public final ByteBuffer wrap(int offset, int length) { + if (offset > this.memory.length || offset > this.memory.length - length) { + throw new IndexOutOfBoundsException(); + } + + if (this.wrapper == null) { + this.wrapper = ByteBuffer.wrap(this.memory, offset, length); + } + else { + this.wrapper.limit(offset + length); + this.wrapper.position(offset); + } + + return this.wrapper; + } + + // ------------------------------------------------------------------------ + // Random Access get() and put() methods + // ------------------------------------------------------------------------ + + public final byte get(int index) { + return this.memory[index]; + } + + public final void put(int index, byte b) { + this.memory[index] = b; + } + + public final void get(int index, byte[] dst) { + get(index, dst, 0, dst.length); + } + + public final void put(int index, byte[] src) { + put(index, src, 0, src.length); + } + + public final void get(int index, byte[] dst, int offset, int length) { + // system arraycopy does the boundary checks anyways, no need to check extra + System.arraycopy(this.memory, index, dst, offset, length); + } + + public final void put(int index, byte[] src, int offset, int length) { + // system arraycopy does the boundary checks anyways, no need to check extra + System.arraycopy(src, offset, this.memory, index, length); + } + + public final boolean getBoolean(int index) { + return this.memory[index] != 0; + } + + public final void putBoolean(int index, boolean value) { + this.memory[index] = (byte) (value ? 1 : 0); + } + + @SuppressWarnings("restriction") + public final char getChar(int index) { + if (index >= 0 && index <= this.memory.length - 2) { + return UNSAFE.getChar(this.memory, BASE_OFFSET + index); + } else { + throw new IndexOutOfBoundsException(); + } + } + + public final char getCharLittleEndian(int index) { + if (LITTLE_ENDIAN) { + return getChar(index); + } else { + return Character.reverseBytes(getChar(index)); + } + } + + public final char getCharBigEndian(int index) { + if (LITTLE_ENDIAN) { + return Character.reverseBytes(getChar(index)); + } else { + return getChar(index); + } + } + + @SuppressWarnings("restriction") + public final void putChar(int index, char value) { + if (index >= 0 && index <= this.memory.length - 2) { + UNSAFE.putChar(this.memory, BASE_OFFSET + index, value); + } else { + throw new IndexOutOfBoundsException(); + } + } + + public final void putCharLittleEndian(int index, char value) { + if (LITTLE_ENDIAN) { + putChar(index, value); + } else { + putChar(index, Character.reverseBytes(value)); + } + } + + public final void putCharBigEndian(int index, char value) { + if (LITTLE_ENDIAN) { + putChar(index, Character.reverseBytes(value)); + } else { + putChar(index, value); + } + } + + @SuppressWarnings("restriction") + public final short getShort(int index) { + if (index >= 0 && index <= this.memory.length - 2) { + return UNSAFE.getShort(this.memory, BASE_OFFSET + index); + } else { + throw new IndexOutOfBoundsException(); + } + } + + public final short getShortLittleEndian(int index) { + if (LITTLE_ENDIAN) { + return getShort(index); + } else { + return Short.reverseBytes(getShort(index)); + } + } + + public final short getShortBigEndian(int index) { + if (LITTLE_ENDIAN) { + return Short.reverseBytes(getShort(index)); + } else { + return getShort(index); + } + } + + @SuppressWarnings("restriction") + public final void putShort(int index, short value) { + if (index >= 0 && index <= this.memory.length - 2) { + UNSAFE.putShort(this.memory, BASE_OFFSET + index, value); + } else { + throw new IndexOutOfBoundsException(); + } + } + + public final void putShortLittleEndian(int index, short value) { + if (LITTLE_ENDIAN) { + putShort(index, value); + } else { + putShort(index, Short.reverseBytes(value)); + } + } + + public final void putShortBigEndian(int index, short value) { + if (LITTLE_ENDIAN) { + putShort(index, Short.reverseBytes(value)); + } else { + putShort(index, value); + } + } + + @SuppressWarnings("restriction") + public final int getInt(int index) { + if (index >= 0 && index <= this.memory.length - 4) { + return UNSAFE.getInt(this.memory, BASE_OFFSET + index); + } else { + throw new IndexOutOfBoundsException(); + } + } + + public final int getIntLittleEndian(int index) { + if (LITTLE_ENDIAN) { + return getInt(index); + } else { + return Integer.reverseBytes(getInt(index)); + } + } + + public final int getIntBigEndian(int index) { + if (LITTLE_ENDIAN) { + return Integer.reverseBytes(getInt(index)); + } else { + return getInt(index); + } + } + + @SuppressWarnings("restriction") + public final void putInt(int index, int value) { + if (index >= 0 && index <= this.memory.length - 4) { + UNSAFE.putInt(this.memory, BASE_OFFSET + index, value); + } else { + throw new IndexOutOfBoundsException(); + } + } + + public final void putIntLittleEndian(int index, int value) { + if (LITTLE_ENDIAN) { + putInt(index, value); + } else { + putInt(index, Integer.reverseBytes(value)); + } + } + + public final void putIntBigEndian(int index, int value) { + if (LITTLE_ENDIAN) { + putInt(index, Integer.reverseBytes(value)); + } else { + putInt(index, value); + } + } + + @SuppressWarnings("restriction") + public final long getLong(int index) { + if (index >= 0 && index <= this.memory.length - 8) { + return UNSAFE.getLong(this.memory, BASE_OFFSET + index); + } else { + throw new IndexOutOfBoundsException(); + } + } + + public final long getLongLittleEndian(int index) { + if (LITTLE_ENDIAN) { + return getLong(index); + } else { + return Long.reverseBytes(getLong(index)); + } + } + + public final long getLongBigEndian(int index) { + if (LITTLE_ENDIAN) { + return Long.reverseBytes(getLong(index)); + } else { + return getLong(index); + } + } + + @SuppressWarnings("restriction") + public final void putLong(int index, long value) { + if (index >= 0 && index <= this.memory.length - 8) { + UNSAFE.putLong(this.memory, BASE_OFFSET + index, value); + } else { + throw new IndexOutOfBoundsException(); + } + } + + public final void putLongLittleEndian(int index, long value) { + if (LITTLE_ENDIAN) { + putLong(index, value); + } else { + putLong(index, Long.reverseBytes(value)); + } + } + + public final void putLongBigEndian(int index, long value) { + if (LITTLE_ENDIAN) { + putLong(index, Long.reverseBytes(value)); + } else { + putLong(index, value); + } + } + + public final float getFloat(int index) { + return Float.intBitsToFloat(getInt(index)); + } + + public final float getFloatLittleEndian(int index) { + return Float.intBitsToFloat(getIntLittleEndian(index)); + } + + public final float getFloatBigEndian(int index) { + return Float.intBitsToFloat(getIntBigEndian(index)); + } + + public final void putFloat(int index, float value) { + putInt(index, Float.floatToRawIntBits(value)); + } + + public final void putFloatLittleEndian(int index, float value) { + putIntLittleEndian(index, Float.floatToRawIntBits(value)); + } + + public final void putFloatBigEndian(int index, float value) { + putIntBigEndian(index, Float.floatToRawIntBits(value)); + } + + public final double getDouble(int index) { + return Double.longBitsToDouble(getLong(index)); + } + + public final double getDoubleLittleEndian(int index) { + return Double.longBitsToDouble(getLongLittleEndian(index)); + } + + public final double getDoubleBigEndian(int index) { + return Double.longBitsToDouble(getLongBigEndian(index)); + } + + public final void putDouble(int index, double value) { + putLong(index, Double.doubleToRawLongBits(value)); + } + + public final void putDoubleLittleEndian(int index, double value) { + putLongLittleEndian(index, Double.doubleToRawLongBits(value)); + } + + public final void putDoubleBigEndian(int index, double value) { + putLongBigEndian(index, Double.doubleToRawLongBits(value)); + } + + // ------------------------------------------------------------------------- + // Bulk Read and Write Methods + // ------------------------------------------------------------------------- + + public final void get(DataOutput out, int offset, int length) throws IOException { + out.write(this.memory, offset, length); + } + + public final void put(DataInput in, int offset, int length) throws IOException { + in.readFully(this.memory, offset, length); + } + + public final void get(int offset, ByteBuffer target, int numBytes) { + // ByteBuffer performs the boundary checks + target.put(this.memory, offset, numBytes); + } + + public final void put(int offset, ByteBuffer source, int numBytes) { + // ByteBuffer performs the boundary checks + source.get(this.memory, offset, numBytes); + } + + public final void copyTo(int offset, PureHeapMemorySegment target, int targetOffset, int numBytes) { + // system arraycopy does the boundary checks anyways, no need to check extra + System.arraycopy(this.memory, offset, target.memory, targetOffset, numBytes); + } + + // ------------------------------------------------------------------------- + // Comparisons & Swapping + // ------------------------------------------------------------------------- + + public final int compare(PureHeapMemorySegment seg2, int offset1, int offset2, int len) { + final byte[] b2 = seg2.memory; + final byte[] b1 = this.memory; + + int val = 0; + for (int pos = 0; pos < len && (val = (b1[offset1 + pos] & 0xff) - (b2[offset2 + pos] & 0xff)) == 0; pos++); + return val; + } + + public final void swapBytes(PureHeapMemorySegment seg2, int offset1, int offset2, int len) { + // swap by bytes (chunks of 8 first, then single bytes) + while (len >= 8) { + long tmp = this.getLong(offset1); + this.putLong(offset1, seg2.getLong(offset2)); + seg2.putLong(offset2, tmp); + offset1 += 8; + offset2 += 8; + len -= 8; + } + while (len > 0) { + byte tmp = this.get(offset1); + this.put(offset1, seg2.get(offset2)); + seg2.put(offset2, tmp); + offset1++; + offset2++; + len--; + } + } + + public final void swapBytes(byte[] auxBuffer, PureHeapMemorySegment seg2, int offset1, int offset2, int len) { + byte[] otherMem = seg2.memory; + System.arraycopy(this.memory, offset1, auxBuffer, 0, len); + System.arraycopy(otherMem, offset2, this.memory, offset1, len); + System.arraycopy(auxBuffer, 0, otherMem, offset2, len); + } + + // -------------------------------------------------------------------------------------------- + // Utilities for native memory accesses and checks + // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("restriction") + private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; + + @SuppressWarnings("restriction") + private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); +}
http://git-wip-us.apache.org/repos/asf/flink/blob/75a52574/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/segments/PureHeapMemorySegmentOutView.java ---------------------------------------------------------------------- diff --git a/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/segments/PureHeapMemorySegmentOutView.java b/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/segments/PureHeapMemorySegmentOutView.java new file mode 100644 index 0000000..70d40d5 --- /dev/null +++ b/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/segments/PureHeapMemorySegmentOutView.java @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.benchmark.core.memory.segments; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.EOFException; +import java.io.IOException; +import java.io.UTFDataFormatException; +import java.util.List; + +public final class PureHeapMemorySegmentOutView implements DataOutputView { + + private PureHeapMemorySegment currentSegment; // the current memory segment to write to + + private int positionInSegment; // the offset in the current segment + + private final int segmentSize; // the size of the memory segments + + private final List<PureHeapMemorySegment> memorySource; + + private final List<PureHeapMemorySegment> fullSegments; + + + private byte[] utfBuffer; // the reusable array for UTF encodings + + + public PureHeapMemorySegmentOutView(List<PureHeapMemorySegment> emptySegments, + List<PureHeapMemorySegment> fullSegmentTarget, int segmentSize) { + this.segmentSize = segmentSize; + this.currentSegment = emptySegments.remove(emptySegments.size() - 1); + + this.memorySource = emptySegments; + this.fullSegments = fullSegmentTarget; + this.fullSegments.add(getCurrentSegment()); + } + + + public void reset() { + if (this.fullSegments.size() != 0) { + throw new IllegalStateException("The target list still contains memory segments."); + } + + clear(); + try { + advance(); + } + catch (IOException ioex) { + throw new RuntimeException("Error getting first segment for record collector.", ioex); + } + } + + // -------------------------------------------------------------------------------------------- + // Page Management + // -------------------------------------------------------------------------------------------- + + public PureHeapMemorySegment nextSegment(PureHeapMemorySegment current, int positionInCurrent) throws EOFException { + int size = this.memorySource.size(); + if (size > 0) { + final PureHeapMemorySegment next = this.memorySource.remove(size - 1); + this.fullSegments.add(next); + return next; + } else { + throw new EOFException(); + } + } + + public PureHeapMemorySegment getCurrentSegment() { + return this.currentSegment; + } + + public int getCurrentPositionInSegment() { + return this.positionInSegment; + } + + public int getSegmentSize() { + return this.segmentSize; + } + + protected void advance() throws IOException { + this.currentSegment = nextSegment(this.currentSegment, this.positionInSegment); + this.positionInSegment = 0; + } + + protected void seekOutput(PureHeapMemorySegment seg, int position) { + this.currentSegment = seg; + this.positionInSegment = position; + } + + protected void clear() { + this.currentSegment = null; + this.positionInSegment = 0; + } + + // -------------------------------------------------------------------------------------------- + // Data Output Specific methods + // -------------------------------------------------------------------------------------------- + + @Override + public void write(int b) throws IOException { + writeByte(b); + } + + @Override + public void write(byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + int remaining = this.segmentSize - this.positionInSegment; + if (remaining >= len) { + this.currentSegment.put(this.positionInSegment, b, off, len); + this.positionInSegment += len; + } + else { + if (remaining == 0) { + advance(); + remaining = this.segmentSize - this.positionInSegment; + } + while (true) { + int toPut = Math.min(remaining, len); + this.currentSegment.put(this.positionInSegment, b, off, toPut); + off += toPut; + len -= toPut; + + if (len > 0) { + this.positionInSegment = this.segmentSize; + advance(); + remaining = this.segmentSize - this.positionInSegment; + } + else { + this.positionInSegment += toPut; + break; + } + } + } + } + + @Override + public void writeBoolean(boolean v) throws IOException { + writeByte(v ? 1 : 0); + } + + @Override + public void writeByte(int v) throws IOException { + if (this.positionInSegment < this.segmentSize) { + this.currentSegment.put(this.positionInSegment++, (byte) v); + } + else { + advance(); + writeByte(v); + } + } + + @Override + public void writeShort(int v) throws IOException { + if (this.positionInSegment < this.segmentSize - 1) { + this.currentSegment.putShortBigEndian(this.positionInSegment, (short) v); + this.positionInSegment += 2; + } + else if (this.positionInSegment == this.segmentSize) { + advance(); + writeShort(v); + } + else { + writeByte(v >> 8); + writeByte(v); + } + } + + @Override + public void writeChar(int v) throws IOException { + if (this.positionInSegment < this.segmentSize - 1) { + this.currentSegment.putCharBigEndian(this.positionInSegment, (char) v); + this.positionInSegment += 2; + } + else if (this.positionInSegment == this.segmentSize) { + advance(); + writeChar(v); + } + else { + writeByte(v >> 8); + writeByte(v); + } + } + + @Override + public void writeInt(int v) throws IOException { + if (this.positionInSegment < this.segmentSize - 3) { + this.currentSegment.putIntBigEndian(this.positionInSegment, v); + this.positionInSegment += 4; + } + else if (this.positionInSegment == this.segmentSize) { + advance(); + writeInt(v); + } + else { + writeByte(v >> 24); + writeByte(v >> 16); + writeByte(v >> 8); + writeByte(v); + } + } + + @Override + public void writeLong(long v) throws IOException { + if (this.positionInSegment < this.segmentSize - 7) { + this.currentSegment.putLongBigEndian(this.positionInSegment, v); + this.positionInSegment += 8; + } + else if (this.positionInSegment == this.segmentSize) { + advance(); + writeLong(v); + } + else { + writeByte((int) (v >> 56)); + writeByte((int) (v >> 48)); + writeByte((int) (v >> 40)); + writeByte((int) (v >> 32)); + writeByte((int) (v >> 24)); + writeByte((int) (v >> 16)); + writeByte((int) (v >> 8)); + writeByte((int) v); + } + } + + @Override + public void writeFloat(float v) throws IOException { + writeInt(Float.floatToRawIntBits(v)); + } + + @Override + public void writeDouble(double v) throws IOException { + writeLong(Double.doubleToRawLongBits(v)); + } + + @Override + public void writeBytes(String s) throws IOException { + for (int i = 0; i < s.length(); i++) { + writeByte(s.charAt(i)); + } + } + + @Override + public void writeChars(String s) throws IOException { + for (int i = 0; i < s.length(); i++) { + writeChar(s.charAt(i)); + } + } + + @Override + public void writeUTF(String str) throws IOException { + int strlen = str.length(); + int utflen = 0; + int c, count = 0; + + /* use charAt instead of copying String to char array */ + for (int i = 0; i < strlen; i++) { + c = str.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) { + utflen++; + } else if (c > 0x07FF) { + utflen += 3; + } else { + utflen += 2; + } + } + + if (utflen > 65535) { + throw new UTFDataFormatException("encoded string too long: " + utflen + " memory"); + } + + if (this.utfBuffer == null || this.utfBuffer.length < utflen + 2) { + this.utfBuffer = new byte[utflen + 2]; + } + final byte[] bytearr = this.utfBuffer; + + bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF); + bytearr[count++] = (byte) (utflen & 0xFF); + + int i = 0; + for (i = 0; i < strlen; i++) { + c = str.charAt(i); + if (!((c >= 0x0001) && (c <= 0x007F))) { + break; + } + bytearr[count++] = (byte) c; + } + + for (; i < strlen; i++) { + c = str.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) { + bytearr[count++] = (byte) c; + + } else if (c > 0x07FF) { + bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); + bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F)); + bytearr[count++] = (byte) (0x80 | (c & 0x3F)); + } else { + bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); + bytearr[count++] = (byte) (0x80 | (c & 0x3F)); + } + } + + write(bytearr, 0, utflen + 2); + } + + @Override + public void skipBytesToWrite(int numBytes) throws IOException { + while (numBytes > 0) { + final int remaining = this.segmentSize - this.positionInSegment; + if (numBytes <= remaining) { + this.positionInSegment += numBytes; + return; + } + this.positionInSegment = this.segmentSize; + advance(); + numBytes -= remaining; + } + } + + @Override + public void write(DataInputView source, int numBytes) throws IOException { + while (numBytes > 0) { + final int remaining = this.segmentSize - this.positionInSegment; + if (numBytes <= remaining) { + this.currentSegment.put(source, this.positionInSegment, numBytes); + this.positionInSegment += numBytes; + return; + } + + if (remaining > 0) { + this.currentSegment.put(source, this.positionInSegment, remaining); + this.positionInSegment = this.segmentSize; + numBytes -= remaining; + } + + advance(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/75a52574/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/segments/PureHybridMemorySegment.java ---------------------------------------------------------------------- diff --git a/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/segments/PureHybridMemorySegment.java b/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/segments/PureHybridMemorySegment.java new file mode 100644 index 0000000..05c3889 --- /dev/null +++ b/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/segments/PureHybridMemorySegment.java @@ -0,0 +1,887 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.benchmark.core.memory.segments; + +import org.apache.flink.core.memory.MemoryUtils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.lang.reflect.Field; +import java.nio.BufferOverflowException; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +public final class PureHybridMemorySegment { + + /** Constant that flags the byte order. Because this is a boolean constant, + * the JIT compiler can use this well to aggressively eliminate the non-applicable code paths */ + private static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN); + + /** The direct byte buffer that allocated the off-heap memory. This memory segment holds a reference + * to that buffer, so as long as this memory segment lives, the memory will not be released. */ + private final ByteBuffer offHeapMemory; + + /** The heap byte array object relative to which we access the memory. Is non-null if the + * memory is on the heap, is null, if the memory if off the heap. If we have this buffer, we + * must never void this reference, or the memory segment will point to undefined addresses + * outside the heap and may in out-of-order execution cases cause segmentation faults. */ + private final byte[] heapMemory; + + /** The address to the data, relative to the heap memory byte array. If the heap memory byte array + * is null, this becomes an absolute memory address outside the heap. */ + private long address; + + /** The address one byte after the last addressable byte. + * This is address + size while the segment is not disposed */ + private final long addressLimit; + + /** The size in bytes of the memory segment */ + private final int size; + + // ------------------------------------------------------------------------- + // Constructors + // ------------------------------------------------------------------------- + + /** + * Creates a new memory segment that represents the memory backing the given direct byte buffer. + * Note that the given ByteBuffer must be direct {@link java.nio.ByteBuffer#allocateDirect(int)}, + * otherwise this method with throw an IllegalArgumentException. + * + * @param buffer The byte buffer whose memory is represented by this memory segment. + * @throws IllegalArgumentException Thrown, if the given ByteBuffer is not direct. + */ + public PureHybridMemorySegment(ByteBuffer buffer) { + if (buffer == null || !buffer.isDirect()) { + throw new IllegalArgumentException("Can't initialize from non-direct ByteBuffer."); + } + + this.offHeapMemory = buffer; + this.heapMemory = null; + this.size = buffer.capacity(); + this.address = getAddress(buffer); + this.addressLimit = this.address + size; + + if (address >= Long.MAX_VALUE - Integer.MAX_VALUE) { + throw new RuntimeException("Segment initialized with too large address: " + address + + " ; Max allowed address is " + (Long.MAX_VALUE - Integer.MAX_VALUE - 1)); + } + } + + /** + * Creates a new memory segment that represents the memory of the byte array. + * + * @param buffer The byte array whose memory is represented by this memory segment. + */ + public PureHybridMemorySegment(byte[] buffer) { + if (buffer == null) { + throw new NullPointerException("buffer"); + } + + this.offHeapMemory = null; + this.heapMemory = buffer; + this.address = BYTE_ARRAY_BASE_OFFSET; + this.addressLimit = BYTE_ARRAY_BASE_OFFSET + buffer.length; + this.size = buffer.length; + } + + // ------------------------------------------------------------------------- + // Memory Segment Specifics + // ------------------------------------------------------------------------- + + /** + * Gets the size of the memory segment, in bytes. + * @return The size of the memory segment. + */ + public final int size() { + return size; + } + + /** + * Checks whether the memory segment was freed. + * @return True, if the memory segment has been freed, false otherwise. + */ + public final boolean isFreed() { + return this.address > this.addressLimit; + } + + /** + * Frees this memory segment. After this operation has been called, no further operations are + * possible on the memory segment and will fail. The actual memory (heap or off-heap) will only + * be released after this memory segment object has become garbage collected. + */ + public final void free() { + // this ensures we can place no more data and trigger + // the checks for the freed segment + address = addressLimit + 1; + } + + /** + * Checks whether this memory segment is backed by off-heap memory. + * @return True, if the memory segment is backed by off-heap memory, false if it is backed + * by heap memory. + */ + public final boolean isOffHeap() { + return heapMemory == null; + } + + public byte[] getArray() { + if (heapMemory != null) { + return heapMemory; + } else { + throw new IllegalStateException("Memory segment does not represent heap memory"); + } + } + + /** + * Gets the buffer that owns the memory of this memory segment. + * + * @return The byte buffer that owns the memory of this memory segment. + */ + public ByteBuffer getOffHeapBuffer() { + if (offHeapMemory != null) { + return offHeapMemory; + } else { + throw new IllegalStateException("Memory segment does not represent off heap memory"); + } + } + + public ByteBuffer wrap(int offset, int length) { + if (offset < 0 || offset > this.size || offset > this.size - length) { + throw new IndexOutOfBoundsException(); + } + + if (heapMemory != null) { + return ByteBuffer.wrap(heapMemory, offset, length); + } + else { + ByteBuffer wrapper = offHeapMemory.duplicate(); + wrapper.limit(offset + length); + wrapper.position(offset); + return wrapper; + } + } + + /** + * Gets this memory segment as a pure heap memory segment. + * + * @return A heap memory segment variant of this memory segment. + * @throws UnsupportedOperationException Thrown, if this memory segment is not + * backed by heap memory. + */ + public final PureHeapMemorySegment asHeapSegment() { + if (heapMemory != null) { + return new PureHeapMemorySegment(heapMemory); + } else { + throw new UnsupportedOperationException("Memory segment is not backed by heap memory"); + } + } + + /** + * Gets this memory segment as a pure off-heap memory segment. + * + * @return An off-heap memory segment variant of this memory segment. + * @throws UnsupportedOperationException Thrown, if this memory segment is not + * backed by off-heap memory. + */ + public final PureOffHeapMemorySegment asOffHeapSegment() { + if (offHeapMemory != null) { + return new PureOffHeapMemorySegment(offHeapMemory); + } else { + throw new UnsupportedOperationException("Memory segment is not backed by off-heap memory"); + } + } + + // ------------------------------------------------------------------------ + // Random Access get() and put() methods + // ------------------------------------------------------------------------ + + @SuppressWarnings("restriction") + public final byte get(int index) { + final long pos = address + index; + if (index >= 0 && pos < addressLimit) { + return UNSAFE.getByte(heapMemory, pos); + } + else if (address > addressLimit) { + throw new IllegalStateException("This segment has been freed."); + } + else { + // index is in fact invalid + throw new IndexOutOfBoundsException(); + } + } + + @SuppressWarnings("restriction") + public final void put(int index, byte b) { + final long pos = address + index; + if (index >= 0 && pos < addressLimit) { + UNSAFE.putByte(heapMemory, pos, b); + } + else if (address > addressLimit) { + throw new IllegalStateException("This segment has been freed."); + } + else { + // index is in fact invalid + throw new IndexOutOfBoundsException(); + } + } + + public final void get(int index, byte[] dst) { + get(index, dst, 0, dst.length); + } + + public final void put(int index, byte[] src) { + put(index, src, 0, src.length); + } + + @SuppressWarnings("restriction") + public final void get(int index, byte[] dst, int offset, int length) { + // check the byte array offset and length + if ((offset | length | (offset + length) | (dst.length - (offset + length))) < 0) { + throw new IndexOutOfBoundsException(); + } + + long pos = address + index; + + if (index >= 0 && pos <= addressLimit - length) { + long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset; + + // the copy must proceed in batches not too large, because the JVM may + // poll for points that are safe for GC (moving the array and changing its address) + while (length > 0) { + long toCopy = Math.min(length, COPY_PER_BATCH); + UNSAFE.copyMemory(heapMemory, pos, dst, arrayAddress, toCopy); + length -= toCopy; + pos += toCopy; + arrayAddress += toCopy; + } + } + else if (address > addressLimit) { + throw new IllegalStateException("This segment has been freed."); + } + else { + // index is in fact invalid + throw new IndexOutOfBoundsException(); + } + } + + @SuppressWarnings("restriction") + public final void put(int index, byte[] src, int offset, int length) { + // check the byte array offset and length + if ((offset | length | (offset + length) | (src.length - (offset + length))) < 0) { + throw new IndexOutOfBoundsException(); + } + + long pos = address + index; + + if (index >= 0 && pos <= addressLimit - length) { + long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset; + while (length > 0) { + long toCopy = Math.min(length, COPY_PER_BATCH); + UNSAFE.copyMemory(src, arrayAddress, heapMemory, pos, toCopy); + length -= toCopy; + pos += toCopy; + arrayAddress += toCopy; + } + } + else if (address > addressLimit) { + throw new IllegalStateException("This segment has been freed."); + } + else { + // index is in fact invalid + throw new IndexOutOfBoundsException(); + } + } + + public final boolean getBoolean(int index) { + return get(index) != 0; + } + + public final void putBoolean(int index, boolean value) { + put(index, (byte) (value ? 1 : 0)); + } + + @SuppressWarnings("restriction") + public final char getChar(int index) { + final long pos = address + index; + if (index >= 0 && pos <= addressLimit - 2) { + return UNSAFE.getChar(heapMemory, pos); + } + else if (address > addressLimit) { + throw new IllegalStateException("This segment has been freed."); + } + else { + // index is in fact invalid + throw new IndexOutOfBoundsException(); + } + } + + public final char getCharLittleEndian(int index) { + if (LITTLE_ENDIAN) { + return getChar(index); + } else { + return Character.reverseBytes(getChar(index)); + } + } + + public final char getCharBigEndian(int index) { + if (LITTLE_ENDIAN) { + return Character.reverseBytes(getChar(index)); + } else { + return getChar(index); + } + } + + @SuppressWarnings("restriction") + public final void putChar(int index, char value) { + final long pos = address + index; + if (index >= 0 && pos <= addressLimit - 2) { + UNSAFE.putChar(heapMemory, pos, value); + } + else if (address > addressLimit) { + throw new IllegalStateException("This segment has been freed."); + } + else { + // index is in fact invalid + throw new IndexOutOfBoundsException(); + } + } + + public final void putCharLittleEndian(int index, char value) { + if (LITTLE_ENDIAN) { + putChar(index, value); + } else { + putChar(index, Character.reverseBytes(value)); + } + } + + public final void putCharBigEndian(int index, char value) { + if (LITTLE_ENDIAN) { + putChar(index, Character.reverseBytes(value)); + } else { + putChar(index, value); + } + } + + @SuppressWarnings("restriction") + public final short getShort(int index) { + final long pos = address + index; + if (index >= 0 && pos <= addressLimit - 2) { + return UNSAFE.getShort(heapMemory, pos); + } + else if (address > addressLimit) { + throw new IllegalStateException("This segment has been freed."); + } + else { + // index is in fact invalid + throw new IndexOutOfBoundsException(); + } + } + + public final short getShortLittleEndian(int index) { + if (LITTLE_ENDIAN) { + return getShort(index); + } else { + return Short.reverseBytes(getShort(index)); + } + } + + public final short getShortBigEndian(int index) { + if (LITTLE_ENDIAN) { + return Short.reverseBytes(getShort(index)); + } else { + return getShort(index); + } + } + + @SuppressWarnings("restriction") + public final void putShort(int index, short value) { + final long pos = address + index; + if (index >= 0 && pos <= addressLimit - 2) { + UNSAFE.putShort(heapMemory, pos, value); + } + else if (address > addressLimit) { + throw new IllegalStateException("This segment has been freed."); + } + else { + // index is in fact invalid + throw new IndexOutOfBoundsException(); + } + } + + public final void putShortLittleEndian(int index, short value) { + if (LITTLE_ENDIAN) { + putShort(index, value); + } else { + putShort(index, Short.reverseBytes(value)); + } + } + + public final void putShortBigEndian(int index, short value) { + if (LITTLE_ENDIAN) { + putShort(index, Short.reverseBytes(value)); + } else { + putShort(index, value); + } + } + + @SuppressWarnings("restriction") + public final int getInt(int index) { + final long pos = address + index; + if (index >= 0 && pos <= addressLimit - 4) { + return UNSAFE.getInt(heapMemory, pos); + } + else if (address > addressLimit) { + throw new IllegalStateException("This segment has been freed."); + } + else { + // index is in fact invalid + throw new IndexOutOfBoundsException(); + } + } + + public final int getIntLittleEndian(int index) { + if (LITTLE_ENDIAN) { + return getInt(index); + } else { + return Integer.reverseBytes(getInt(index)); + } + } + + public final int getIntBigEndian(int index) { + if (LITTLE_ENDIAN) { + return Integer.reverseBytes(getInt(index)); + } else { + return getInt(index); + } + } + + @SuppressWarnings("restriction") + public final void putInt(int index, int value) { + final long pos = address + index; + if (index >= 0 && pos <= addressLimit - 4) { + UNSAFE.putInt(heapMemory, pos, value); + } + else if (address > addressLimit) { + throw new IllegalStateException("This segment has been freed."); + } + else { + // index is in fact invalid + throw new IndexOutOfBoundsException(); + } + } + + public final void putIntLittleEndian(int index, int value) { + if (LITTLE_ENDIAN) { + putInt(index, value); + } else { + putInt(index, Integer.reverseBytes(value)); + } + } + + public final void putIntBigEndian(int index, int value) { + if (LITTLE_ENDIAN) { + putInt(index, Integer.reverseBytes(value)); + } else { + putInt(index, value); + } + } + + @SuppressWarnings("restriction") + public final long getLong(int index) { + final long pos = address + index; + if (index >= 0 && pos <= addressLimit - 8) { + return UNSAFE.getLong(heapMemory, pos); + } + else if (address > addressLimit) { + throw new IllegalStateException("This segment has been freed."); + } + else { + // index is in fact invalid + throw new IndexOutOfBoundsException(); + } + } + + public final long getLongLittleEndian(int index) { + if (LITTLE_ENDIAN) { + return getLong(index); + } else { + return Long.reverseBytes(getLong(index)); + } + } + + public final long getLongBigEndian(int index) { + if (LITTLE_ENDIAN) { + return Long.reverseBytes(getLong(index)); + } else { + return getLong(index); + } + } + + @SuppressWarnings("restriction") + public final void putLong(int index, long value) { + final long pos = address + index; + if (index >= 0 && pos <= addressLimit - 8) { + UNSAFE.putLong(heapMemory, pos, value); + } + else if (address > addressLimit) { + throw new IllegalStateException("This segment has been freed."); + } + else { + // index is in fact invalid + throw new IndexOutOfBoundsException(); + } + } + + public final void putLongLittleEndian(int index, long value) { + if (LITTLE_ENDIAN) { + putLong(index, value); + } else { + putLong(index, Long.reverseBytes(value)); + } + } + + public final void putLongBigEndian(int index, long value) { + if (LITTLE_ENDIAN) { + putLong(index, Long.reverseBytes(value)); + } else { + putLong(index, value); + } + } + + public final float getFloat(int index) { + return Float.intBitsToFloat(getInt(index)); + } + + public final float getFloatLittleEndian(int index) { + return Float.intBitsToFloat(getIntLittleEndian(index)); + } + + public final float getFloatBigEndian(int index) { + return Float.intBitsToFloat(getIntBigEndian(index)); + } + + public final void putFloat(int index, float value) { + putInt(index, Float.floatToRawIntBits(value)); + } + + public final void putFloatLittleEndian(int index, float value) { + putIntLittleEndian(index, Float.floatToRawIntBits(value)); + } + + public final void putFloatBigEndian(int index, float value) { + putIntBigEndian(index, Float.floatToRawIntBits(value)); + } + + public final double getDouble(int index) { + return Double.longBitsToDouble(getLong(index)); + } + + public final double getDoubleLittleEndian(int index) { + return Double.longBitsToDouble(getLongLittleEndian(index)); + } + + public final double getDoubleBigEndian(int index) { + return Double.longBitsToDouble(getLongBigEndian(index)); + } + + public final void putDouble(int index, double value) { + putLong(index, Double.doubleToRawLongBits(value)); + } + + public final void putDoubleLittleEndian(int index, double value) { + putLongLittleEndian(index, Double.doubleToRawLongBits(value)); + } + + public final void putDoubleBigEndian(int index, double value) { + putLongBigEndian(index, Double.doubleToRawLongBits(value)); + } + + // ------------------------------------------------------------------------- + // Bulk Read and Write Methods + // ------------------------------------------------------------------------- + + public final void get(DataOutput out, int offset, int length) throws IOException { + if (heapMemory != null) { + out.write(heapMemory, offset, length); + } + else { + while (length >= 8) { + out.writeLong(getLongBigEndian(offset)); + offset += 8; + length -= 8; + } + + while (length > 0) { + out.writeByte(get(offset)); + offset++; + length--; + } + } + } + + public final void put(DataInput in, int offset, int length) throws IOException { + if (heapMemory != null) { + in.readFully(heapMemory, offset, length); + } + else { + while (length >= 8) { + putLongBigEndian(offset, in.readLong()); + offset += 8; + length -= 8; + } + while(length > 0) { + put(offset, in.readByte()); + offset++; + length--; + } + } + } + + @SuppressWarnings("restriction") + public final void get(int offset, ByteBuffer target, int numBytes) { + if (heapMemory != null) { + // ByteBuffer performs the boundary checks + target.put(heapMemory, offset, numBytes); + } + else { + // check the byte array offset and length + if ((offset | numBytes | (offset + numBytes) | (size - (offset + numBytes))) < 0) { + throw new IndexOutOfBoundsException(); + } + + final int targetOffset = target.position(); + final int remaining = target.remaining(); + + if (remaining < numBytes) { + throw new BufferOverflowException(); + } + + if (target.isDirect()) { + // copy to the target memory directly + final long targetPointer = getAddress(target) + targetOffset; + final long sourcePointer = address + offset; + + if (sourcePointer <= addressLimit - numBytes) { + UNSAFE.copyMemory(sourcePointer, targetPointer, numBytes); + } + else if (address > addressLimit) { + throw new IllegalStateException("This segment has been freed."); + } + else { + throw new IndexOutOfBoundsException(); + } + } + else if (target.hasArray()) { + // move directly into the byte array + get(offset, target.array(), targetOffset + target.arrayOffset(), numBytes); + + // this must be after the get() call to ensue that the byte buffer is not + // modified in case the call fails + target.position(targetOffset + numBytes); + } + else { + // neither heap buffer nor direct buffer + while (target.hasRemaining()) { + target.put(get(offset++)); + } + } + } + } + + @SuppressWarnings("restriction") + public final void put(int offset, ByteBuffer source, int numBytes) { + if (heapMemory != null) { + source.get(heapMemory, offset, numBytes); + } + else { + // check the byte array offset and length + if ((offset | numBytes | (offset + numBytes) | (size - (offset + numBytes))) < 0) { + throw new IndexOutOfBoundsException(); + } + + final int sourceOffset = source.position(); + final int remaining = source.remaining(); + + if (remaining < numBytes) { + throw new BufferUnderflowException(); + } + + if (source.isDirect()) { + // copy to the target memory directly + final long sourcePointer = getAddress(source) + sourceOffset; + final long targetPointer = address + offset; + + if (sourcePointer <= addressLimit - numBytes) { + UNSAFE.copyMemory(sourcePointer, targetPointer, numBytes); + } + else if (address > addressLimit) { + throw new IllegalStateException("This segment has been freed."); + } + else { + throw new IndexOutOfBoundsException(); + } + } + else if (source.hasArray()) { + // move directly into the byte array + put(offset, source.array(), sourceOffset + source.arrayOffset(), numBytes); + + // this must be after the get() call to ensue that the byte buffer is not + // modified in case the call fails + source.position(sourceOffset + numBytes); + } + else { + // neither heap buffer nor direct buffer + while (source.hasRemaining()) { + put(offset++, source.get()); + } + } + } + } + + @SuppressWarnings("restriction") + public final void copyTo(int offset, PureHybridMemorySegment target, int targetOffset, int numBytes) { + final byte[] thisHeapRef = this.heapMemory; + final byte[] otherHeapRef = target.heapMemory; + final long thisPointer = this.address + offset; + final long otherPointer = target.address + targetOffset; + + if (numBytes >= 0 & thisPointer <= this.addressLimit - numBytes & otherPointer <= target.addressLimit - numBytes) { + UNSAFE.copyMemory(thisHeapRef, thisPointer, otherHeapRef, otherPointer, numBytes); + } + else if (address > addressLimit | target.address > target.addressLimit) { + throw new IllegalStateException("segment has been freed."); + } + else { + throw new IndexOutOfBoundsException(); + } + } + + public int compare(PureHybridMemorySegment seg2, int offset1, int offset2, int len) { + while (len >= 8) { + long l1 = this.getLongBigEndian(offset1); + long l2 = seg2.getLongBigEndian(offset2); + + if (l1 != l2) { + return (l1 < l2) ^ (l1 < 0) ^ (l2 < 0) ? -1 : 1; + } + + offset1 += 8; + offset2 += 8; + len -= 8; + } + while (len > 0) { + int b1 = this.get(offset1) & 0xff; + int b2 = seg2.get(offset2) & 0xff; + int cmp = b1 - b2; + if (cmp != 0) { + return cmp; + } + offset1++; + offset2++; + len--; + } + return 0; + } + + public void swapBytes(byte[] tempBuffer, PureHybridMemorySegment seg2, int offset1, int offset2, int len) { + if (len < 32) { + // fast path for short copies + while (len >= 8) { + long tmp = this.getLong(offset1); + this.putLong(offset1, seg2.getLong(offset2)); + seg2.putLong(offset2, tmp); + offset1 += 8; + offset2 += 8; + len -= 8; + } + while (len > 0) { + byte tmp = this.get(offset1); + this.put(offset1, seg2.get(offset2)); + seg2.put(offset2, tmp); + offset1++; + offset2++; + len--; + } + } + else if ( (offset1 | offset2 | len | (offset1 + len) | (offset2 + len) | + (this.size - (offset1 + len)) | (seg2.size() - (offset2 + len))) < 0 || len > tempBuffer.length) + { + throw new IndexOutOfBoundsException(); + } + else { + final long thisPos = this.address + offset1; + final long otherPos = seg2.address + offset2; + + if (thisPos <= this.addressLimit - len && otherPos <= seg2.addressLimit - len) { + final long arrayAddress = BYTE_ARRAY_BASE_OFFSET; + + // this -> temp buffer + UNSAFE.copyMemory(this.heapMemory, thisPos, tempBuffer, arrayAddress, len); + + // other -> this + UNSAFE.copyMemory(seg2.heapMemory, otherPos, this.heapMemory, thisPos, len); + + // temp buffer -> other + UNSAFE.copyMemory(tempBuffer, arrayAddress, seg2.heapMemory, otherPos, len); + } + else if (this.address <= 0 || seg2.address <= 0) { + throw new IllegalStateException("Memory segment has been freed."); + } + else { + // index is in fact invalid + throw new IndexOutOfBoundsException(); + } + } + } + + // -------------------------------------------------------------------------------------------- + // Utilities for native memory accesses and checks + // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("restriction") + private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; + + @SuppressWarnings("restriction") + private static final long BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); + + private static final long COPY_PER_BATCH = 1024 * 1024; + + private static final Field ADDRESS_FIELD; + + static { + try { + ADDRESS_FIELD = java.nio.Buffer.class.getDeclaredField("address"); + ADDRESS_FIELD.setAccessible(true); + } + catch (Throwable t) { + throw new RuntimeException("Cannot initialize DirectMemorySegment - direct memory not supported by the JVM."); + } + } + + private static long getAddress(ByteBuffer buf) { + try { + return (Long) ADDRESS_FIELD.get(buf); + } + catch (Throwable t) { + throw new RuntimeException("Could not access direct byte buffer address.", t); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/75a52574/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/segments/PureHybridMemorySegmentOutView.java ---------------------------------------------------------------------- diff --git a/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/segments/PureHybridMemorySegmentOutView.java b/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/segments/PureHybridMemorySegmentOutView.java new file mode 100644 index 0000000..65bfe6b --- /dev/null +++ b/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/segments/PureHybridMemorySegmentOutView.java @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.benchmark.core.memory.segments; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.EOFException; +import java.io.IOException; +import java.io.UTFDataFormatException; +import java.util.List; + +public final class PureHybridMemorySegmentOutView implements DataOutputView { + + private PureHybridMemorySegment currentSegment; // the current memory segment to write to + + private int positionInSegment; // the offset in the current segment + + private final int segmentSize; // the size of the memory segments + + private final List<PureHybridMemorySegment> memorySource; + + private final List<PureHybridMemorySegment> fullSegments; + + + private byte[] utfBuffer; // the reusable array for UTF encodings + + + public PureHybridMemorySegmentOutView(List<PureHybridMemorySegment> emptySegments, + List<PureHybridMemorySegment> fullSegmentTarget, int segmentSize) { + this.segmentSize = segmentSize; + this.currentSegment = emptySegments.remove(emptySegments.size() - 1); + + this.memorySource = emptySegments; + this.fullSegments = fullSegmentTarget; + this.fullSegments.add(getCurrentSegment()); + } + + + public void reset() { + if (this.fullSegments.size() != 0) { + throw new IllegalStateException("The target list still contains memory segments."); + } + + clear(); + try { + advance(); + } + catch (IOException ioex) { + throw new RuntimeException("Error getting first segment for record collector.", ioex); + } + } + + // -------------------------------------------------------------------------------------------- + // Page Management + // -------------------------------------------------------------------------------------------- + + public PureHybridMemorySegment nextSegment(PureHybridMemorySegment current, int positionInCurrent) throws EOFException { + int size = this.memorySource.size(); + if (size > 0) { + final PureHybridMemorySegment next = this.memorySource.remove(size - 1); + this.fullSegments.add(next); + return next; + } else { + throw new EOFException(); + } + } + + public PureHybridMemorySegment getCurrentSegment() { + return this.currentSegment; + } + + public int getCurrentPositionInSegment() { + return this.positionInSegment; + } + + public int getSegmentSize() { + return this.segmentSize; + } + + protected void advance() throws IOException { + this.currentSegment = nextSegment(this.currentSegment, this.positionInSegment); + this.positionInSegment = 0; + } + + protected void seekOutput(PureHybridMemorySegment seg, int position) { + this.currentSegment = seg; + this.positionInSegment = position; + } + + protected void clear() { + this.currentSegment = null; + this.positionInSegment = 0; + } + + // -------------------------------------------------------------------------------------------- + // Data Output Specific methods + // -------------------------------------------------------------------------------------------- + + @Override + public void write(int b) throws IOException { + writeByte(b); + } + + @Override + public void write(byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + int remaining = this.segmentSize - this.positionInSegment; + if (remaining >= len) { + this.currentSegment.put(this.positionInSegment, b, off, len); + this.positionInSegment += len; + } + else { + if (remaining == 0) { + advance(); + remaining = this.segmentSize - this.positionInSegment; + } + while (true) { + int toPut = Math.min(remaining, len); + this.currentSegment.put(this.positionInSegment, b, off, toPut); + off += toPut; + len -= toPut; + + if (len > 0) { + this.positionInSegment = this.segmentSize; + advance(); + remaining = this.segmentSize - this.positionInSegment; + } + else { + this.positionInSegment += toPut; + break; + } + } + } + } + + @Override + public void writeBoolean(boolean v) throws IOException { + writeByte(v ? 1 : 0); + } + + @Override + public void writeByte(int v) throws IOException { + if (this.positionInSegment < this.segmentSize) { + this.currentSegment.put(this.positionInSegment++, (byte) v); + } + else { + advance(); + writeByte(v); + } + } + + @Override + public void writeShort(int v) throws IOException { + if (this.positionInSegment < this.segmentSize - 1) { + this.currentSegment.putShortBigEndian(this.positionInSegment, (short) v); + this.positionInSegment += 2; + } + else if (this.positionInSegment == this.segmentSize) { + advance(); + writeShort(v); + } + else { + writeByte(v >> 8); + writeByte(v); + } + } + + @Override + public void writeChar(int v) throws IOException { + if (this.positionInSegment < this.segmentSize - 1) { + this.currentSegment.putCharBigEndian(this.positionInSegment, (char) v); + this.positionInSegment += 2; + } + else if (this.positionInSegment == this.segmentSize) { + advance(); + writeChar(v); + } + else { + writeByte(v >> 8); + writeByte(v); + } + } + + @Override + public void writeInt(int v) throws IOException { + if (this.positionInSegment < this.segmentSize - 3) { + this.currentSegment.putIntBigEndian(this.positionInSegment, v); + this.positionInSegment += 4; + } + else if (this.positionInSegment == this.segmentSize) { + advance(); + writeInt(v); + } + else { + writeByte(v >> 24); + writeByte(v >> 16); + writeByte(v >> 8); + writeByte(v); + } + } + + @Override + public void writeLong(long v) throws IOException { + if (this.positionInSegment < this.segmentSize - 7) { + this.currentSegment.putLongBigEndian(this.positionInSegment, v); + this.positionInSegment += 8; + } + else if (this.positionInSegment == this.segmentSize) { + advance(); + writeLong(v); + } + else { + writeByte((int) (v >> 56)); + writeByte((int) (v >> 48)); + writeByte((int) (v >> 40)); + writeByte((int) (v >> 32)); + writeByte((int) (v >> 24)); + writeByte((int) (v >> 16)); + writeByte((int) (v >> 8)); + writeByte((int) v); + } + } + + @Override + public void writeFloat(float v) throws IOException { + writeInt(Float.floatToRawIntBits(v)); + } + + @Override + public void writeDouble(double v) throws IOException { + writeLong(Double.doubleToRawLongBits(v)); + } + + @Override + public void writeBytes(String s) throws IOException { + for (int i = 0; i < s.length(); i++) { + writeByte(s.charAt(i)); + } + } + + @Override + public void writeChars(String s) throws IOException { + for (int i = 0; i < s.length(); i++) { + writeChar(s.charAt(i)); + } + } + + @Override + public void writeUTF(String str) throws IOException { + int strlen = str.length(); + int utflen = 0; + int c, count = 0; + + /* use charAt instead of copying String to char array */ + for (int i = 0; i < strlen; i++) { + c = str.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) { + utflen++; + } else if (c > 0x07FF) { + utflen += 3; + } else { + utflen += 2; + } + } + + if (utflen > 65535) { + throw new UTFDataFormatException("encoded string too long: " + utflen + " memory"); + } + + if (this.utfBuffer == null || this.utfBuffer.length < utflen + 2) { + this.utfBuffer = new byte[utflen + 2]; + } + final byte[] bytearr = this.utfBuffer; + + bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF); + bytearr[count++] = (byte) (utflen & 0xFF); + + int i = 0; + for (i = 0; i < strlen; i++) { + c = str.charAt(i); + if (!((c >= 0x0001) && (c <= 0x007F))) { + break; + } + bytearr[count++] = (byte) c; + } + + for (; i < strlen; i++) { + c = str.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) { + bytearr[count++] = (byte) c; + + } else if (c > 0x07FF) { + bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); + bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F)); + bytearr[count++] = (byte) (0x80 | (c & 0x3F)); + } else { + bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); + bytearr[count++] = (byte) (0x80 | (c & 0x3F)); + } + } + + write(bytearr, 0, utflen + 2); + } + + @Override + public void skipBytesToWrite(int numBytes) throws IOException { + while (numBytes > 0) { + final int remaining = this.segmentSize - this.positionInSegment; + if (numBytes <= remaining) { + this.positionInSegment += numBytes; + return; + } + this.positionInSegment = this.segmentSize; + advance(); + numBytes -= remaining; + } + } + + @Override + public void write(DataInputView source, int numBytes) throws IOException { + while (numBytes > 0) { + final int remaining = this.segmentSize - this.positionInSegment; + if (numBytes <= remaining) { + this.currentSegment.put(source, this.positionInSegment, numBytes); + this.positionInSegment += numBytes; + return; + } + + if (remaining > 0) { + this.currentSegment.put(source, this.positionInSegment, remaining); + this.positionInSegment = this.segmentSize; + numBytes -= remaining; + } + + advance(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/75a52574/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/segments/PureOffHeapMemorySegment.java ---------------------------------------------------------------------- diff --git a/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/segments/PureOffHeapMemorySegment.java b/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/segments/PureOffHeapMemorySegment.java new file mode 100644 index 0000000..7032c5e --- /dev/null +++ b/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/segments/PureOffHeapMemorySegment.java @@ -0,0 +1,790 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.benchmark.core.memory.segments; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemoryUtils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.lang.reflect.Field; +import java.nio.BufferOverflowException; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +public final class PureOffHeapMemorySegment { + + /** Constant that flags the byte order. Because this is a boolean constant, + * the JIT compiler can use this well to aggressively eliminate the non-applicable code paths */ + private static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN); + + /** The direct byte buffer that allocated the memory */ + private ByteBuffer buffer; + + /** The address to the off-heap data */ + private long address; + + /** The address one byte after the last addressable byte. + * This is address + size while the segment is not disposed */ + private final long addressLimit; + + /** The size in bytes of the memory segment */ + private final int size; + + // ------------------------------------------------------------------------- + // Constructors + // ------------------------------------------------------------------------- + + /** + * Creates a new memory segment that represents the memory backing the given direct byte buffer. + * Note that the given ByteBuffer must be direct {@link java.nio.ByteBuffer#allocateDirect(int)}, + * otherwise this method with throw an IllegalArgumentException. data in the given byte array. + * + * @param buffer The byte buffer whose memory is represented by this memory segment. + * @throws IllegalArgumentException Thrown, if the given ByteBuffer is not direct. + */ + public PureOffHeapMemorySegment(ByteBuffer buffer) { + if (buffer == null || !buffer.isDirect()) { + throw new IllegalArgumentException("Can't initialize from non-direct ByteBuffer."); + } + + this.buffer = buffer; + this.size = buffer.capacity(); + this.address = getAddress(buffer); + this.addressLimit = this.address + size; + + if (address >= Long.MAX_VALUE - Integer.MAX_VALUE) { + throw new RuntimeException("Segment initialized with too large address: " + address); + } + } + + // ------------------------------------------------------------------------- + // Direct Memory Segment Specifics + // ------------------------------------------------------------------------- + + /** + * Gets the buffer that owns the memory of this memory segment. + * + * @return The byte buffer that owns the memory of this memory segment. + */ + public ByteBuffer getBuffer() { + return this.buffer; + } + + /** + * Gets the memory address of the memory backing this memory segment. + * + * @return The memory start address of the memory backing this memory segment. + */ + public long getAddress() { + return address; + } + + // ------------------------------------------------------------------------- + // MemorySegment Accessors + // ------------------------------------------------------------------------- + + public final boolean isFreed() { + return this.address > this.addressLimit; + } + + public final void free() { + // this ensures we can place no more data and trigger + // the checks for the freed segment + this.address = this.addressLimit + 1; + this.buffer = null; + } + + public final int size() { + return this.size; + } + + public ByteBuffer wrap(int offset, int length) { + if (offset < 0 || offset > this.size || offset > this.size - length) { + throw new IndexOutOfBoundsException(); + } + + this.buffer.limit(offset + length); + this.buffer.position(offset); + + return this.buffer; + } + + + // ------------------------------------------------------------------------ + // Random Access get() and put() methods + // ------------------------------------------------------------------------ + + @SuppressWarnings("restriction") + public final byte get(int index) { + + final long pos = address + index; + if (index >= 0 && pos < addressLimit) { + return UNSAFE.getByte(pos); + } + else if (address > addressLimit) { + throw new IllegalStateException("This segment has been freed."); + } + else { + // index is in fact invalid + throw new IndexOutOfBoundsException(); + } + } + + @SuppressWarnings("restriction") + public final void put(int index, byte b) { + + final long pos = address + index; + if (index >= 0 && pos < addressLimit) { + UNSAFE.putByte(pos, b); + } + else if (address > addressLimit) { + throw new IllegalStateException("This segment has been freed."); + } + else { + // index is in fact invalid + throw new IndexOutOfBoundsException(); + } + } + + public final void get(int index, byte[] dst) { + get(index, dst, 0, dst.length); + } + + public final void put(int index, byte[] src) { + put(index, src, 0, src.length); + } + + @SuppressWarnings("restriction") + public final void get(int index, byte[] dst, int offset, int length) { + + // check the byte array offset and length + if ((offset | length | (offset + length) | (dst.length - (offset + length))) < 0) { + throw new IndexOutOfBoundsException(); + } + + long pos = address + index; + + if (index >= 0 && pos <= addressLimit - length) { + long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset; + + // the copy must proceed in batches not too large, because the JVM may + // poll for points that are safe for GC (moving the array and changing its address) + while (length > 0) { + long toCopy = (length > COPY_PER_BATCH) ? COPY_PER_BATCH : length; + UNSAFE.copyMemory(null, pos, dst, arrayAddress, toCopy); + length -= toCopy; + pos += toCopy; + arrayAddress += toCopy; + } + } + else if (address <= 0) { + throw new IllegalStateException("This segment has been freed."); + } + else { + // index is in fact invalid + throw new IndexOutOfBoundsException(); + } + } + + @SuppressWarnings("restriction") + public final void put(int index, byte[] src, int offset, int length) { + // check the byte array offset and length + if ((offset | length | (offset + length) | (src.length - (offset + length))) < 0) { + throw new IndexOutOfBoundsException(); + } + + long pos = address + index; + + if (index >= 0 && pos <= addressLimit - length) { + + long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset; + while (length > 0) { + long toCopy = (length > COPY_PER_BATCH) ? COPY_PER_BATCH : length; + UNSAFE.copyMemory(src, arrayAddress, null, pos, toCopy); + length -= toCopy; + pos += toCopy; + arrayAddress += toCopy; + } + } + else if (address <= 0) { + throw new IllegalStateException("This segment has been freed."); + } + else { + // index is in fact invalid + throw new IndexOutOfBoundsException(); + } + } + + public final boolean getBoolean(int index) { + return get(index) != 0; + } + + public final void putBoolean(int index, boolean value) { + put(index, (byte) (value ? 1 : 0)); + } + + @SuppressWarnings("restriction") + public final char getChar(int index) { + final long pos = address + index; + if (index >= 0 && pos <= addressLimit - 2) { + return UNSAFE.getChar(pos); + } + else if (address > addressLimit) { + throw new IllegalStateException("This segment has been freed."); + } + else { + // index is in fact invalid + throw new IndexOutOfBoundsException(); + } + } + + public final char getCharLittleEndian(int index) { + if (LITTLE_ENDIAN) { + return getChar(index); + } else { + return Character.reverseBytes(getChar(index)); + } + } + + public final char getCharBigEndian(int index) { + if (LITTLE_ENDIAN) { + return Character.reverseBytes(getChar(index)); + } else { + return getChar(index); + } + } + + @SuppressWarnings("restriction") + public final void putChar(int index, char value) { + final long pos = address + index; + if (index >= 0 && pos <= addressLimit - 2) { + UNSAFE.putChar(pos, value); + } + else if (address > addressLimit) { + throw new IllegalStateException("This segment has been freed."); + } + else { + // index is in fact invalid + throw new IndexOutOfBoundsException(); + } + } + + public final void putCharLittleEndian(int index, char value) { + if (LITTLE_ENDIAN) { + putChar(index, value); + } else { + putChar(index, Character.reverseBytes(value)); + } + } + + public final void putCharBigEndian(int index, char value) { + if (LITTLE_ENDIAN) { + putChar(index, Character.reverseBytes(value)); + } else { + putChar(index, value); + } + } + + @SuppressWarnings("restriction") + public final short getShort(int index) { + final long pos = address + index; + if (index >= 0 && pos <= addressLimit - 2) { + return UNSAFE.getShort(pos); + } + else if (address > addressLimit) { + throw new IllegalStateException("This segment has been freed."); + } + else { + // index is in fact invalid + throw new IndexOutOfBoundsException(); + } + } + + public final short getShortLittleEndian(int index) { + if (LITTLE_ENDIAN) { + return getShort(index); + } else { + return Short.reverseBytes(getShort(index)); + } + } + + public final short getShortBigEndian(int index) { + if (LITTLE_ENDIAN) { + return Short.reverseBytes(getShort(index)); + } else { + return getShort(index); + } + } + + @SuppressWarnings("restriction") + public final void putShort(int index, short value) { + final long pos = address + index; + if (index >= 0 && pos <= addressLimit - 2) { + UNSAFE.putShort(pos, value); + } + else if (address > addressLimit) { + throw new IllegalStateException("This segment has been freed."); + } + else { + // index is in fact invalid + throw new IndexOutOfBoundsException(); + } + } + + public final void putShortLittleEndian(int index, short value) { + if (LITTLE_ENDIAN) { + putShort(index, value); + } else { + putShort(index, Short.reverseBytes(value)); + } + } + + public final void putShortBigEndian(int index, short value) { + if (LITTLE_ENDIAN) { + putShort(index, Short.reverseBytes(value)); + } else { + putShort(index, value); + } + } + + @SuppressWarnings("restriction") + public final int getInt(int index) { + final long pos = address + index; + if (index >= 0 && pos <= addressLimit - 4) { + return UNSAFE.getInt(pos); + } + else if (address > addressLimit) { + throw new IllegalStateException("This segment has been freed."); + } + else { + // index is in fact invalid + throw new IndexOutOfBoundsException(); + } + } + + public final int getIntLittleEndian(int index) { + if (LITTLE_ENDIAN) { + return getInt(index); + } else { + return Integer.reverseBytes(getInt(index)); + } + } + + public final int getIntBigEndian(int index) { + if (LITTLE_ENDIAN) { + return Integer.reverseBytes(getInt(index)); + } else { + return getInt(index); + } + } + + @SuppressWarnings("restriction") + public final void putInt(int index, int value) { + final long pos = address + index; + if (index >= 0 && pos <= addressLimit - 4) { + UNSAFE.putInt(pos, value); + } + else if (address > addressLimit) { + throw new IllegalStateException("This segment has been freed."); + } + else { + // index is in fact invalid + throw new IndexOutOfBoundsException(); + } + } + + public final void putIntLittleEndian(int index, int value) { + if (LITTLE_ENDIAN) { + putInt(index, value); + } else { + putInt(index, Integer.reverseBytes(value)); + } + } + + public final void putIntBigEndian(int index, int value) { + if (LITTLE_ENDIAN) { + putInt(index, Integer.reverseBytes(value)); + } else { + putInt(index, value); + } + } + + @SuppressWarnings("restriction") + public final long getLong(int index) { + final long pos = address + index; + if (index >= 0 && pos <= addressLimit - 8) { + return UNSAFE.getLong(pos); + } + else if (address > addressLimit) { + throw new IllegalStateException("This segment has been freed."); + } + else { + // index is in fact invalid + throw new IndexOutOfBoundsException(); + } + } + + public final long getLongLittleEndian(int index) { + if (LITTLE_ENDIAN) { + return getLong(index); + } else { + return Long.reverseBytes(getLong(index)); + } + } + + public final long getLongBigEndian(int index) { + if (LITTLE_ENDIAN) { + return Long.reverseBytes(getLong(index)); + } else { + return getLong(index); + } + } + + @SuppressWarnings("restriction") + public final void putLong(int index, long value) { + final long pos = address + index; + if (index >= 0 && pos <= addressLimit - 8) { + UNSAFE.putLong(pos, value); + } + else if (address > addressLimit) { + throw new IllegalStateException("This segment has been freed."); + } + else { + // index is in fact invalid + throw new IndexOutOfBoundsException(); + } + } + + public final void putLongLittleEndian(int index, long value) { + if (LITTLE_ENDIAN) { + putLong(index, value); + } else { + putLong(index, Long.reverseBytes(value)); + } + } + + public final void putLongBigEndian(int index, long value) { + if (LITTLE_ENDIAN) { + putLong(index, Long.reverseBytes(value)); + } else { + putLong(index, value); + } + } + + public final float getFloat(int index) { + return Float.intBitsToFloat(getInt(index)); + } + + public final float getFloatLittleEndian(int index) { + return Float.intBitsToFloat(getIntLittleEndian(index)); + } + + public final float getFloatBigEndian(int index) { + return Float.intBitsToFloat(getIntBigEndian(index)); + } + + public final void putFloat(int index, float value) { + putInt(index, Float.floatToRawIntBits(value)); + } + + public final void putFloatLittleEndian(int index, float value) { + putIntLittleEndian(index, Float.floatToRawIntBits(value)); + } + + public final void putFloatBigEndian(int index, float value) { + putIntBigEndian(index, Float.floatToRawIntBits(value)); + } + + public final double getDouble(int index) { + return Double.longBitsToDouble(getLong(index)); + } + + public final double getDoubleLittleEndian(int index) { + return Double.longBitsToDouble(getLongLittleEndian(index)); + } + + public final double getDoubleBigEndian(int index) { + return Double.longBitsToDouble(getLongBigEndian(index)); + } + + public final void putDouble(int index, double value) { + putLong(index, Double.doubleToRawLongBits(value)); + } + + public final void putDoubleLittleEndian(int index, double value) { + putLongLittleEndian(index, Double.doubleToRawLongBits(value)); + } + + public final void putDoubleBigEndian(int index, double value) { + putLongBigEndian(index, Double.doubleToRawLongBits(value)); + } + + // ------------------------------------------------------------------------- + // Bulk Read and Write Methods + // ------------------------------------------------------------------------- + + public final void get(DataOutput out, int offset, int length) throws IOException { + while (length >= 8) { + out.writeLong(getLongBigEndian(offset)); + offset += 8; + length -= 8; + } + + while(length > 0) { + out.writeByte(get(offset)); + offset++; + length--; + } + } + + public final void put(DataInput in, int offset, int length) throws IOException { + while (length >= 8) { + putLongBigEndian(offset, in.readLong()); + offset += 8; + length -= 8; + } + while(length > 0) { + put(offset, in.readByte()); + offset++; + length--; + } + } + + @SuppressWarnings("restriction") + public final void get(int offset, ByteBuffer target, int numBytes) { + + // check the byte array offset and length + if ((offset | numBytes | (offset + numBytes) | (size - (offset + numBytes))) < 0) { + throw new IndexOutOfBoundsException(); + } + + final int targetOffset = target.position(); + final int remaining = target.remaining(); + + if (remaining < numBytes) { + throw new BufferOverflowException(); + } + + if (target.isDirect()) { + // copy to the target memory directly + final long targetPointer = getAddress(target) + targetOffset; + final long sourcePointer = address + offset; + + if (sourcePointer <= addressLimit - numBytes) { + UNSAFE.copyMemory(sourcePointer, targetPointer, numBytes); + } + else if (address > addressLimit) { + throw new IllegalStateException("This segment has been freed."); + } + else { + throw new IndexOutOfBoundsException(); + } + } + else if (target.hasArray()) { + // move directly into the byte array + get(offset, target.array(), targetOffset + target.arrayOffset(), numBytes); + + // this must be after the get() call to ensue that the byte buffer is not + // modified in case the call fails + target.position(targetOffset + numBytes); + } + else { + // neither heap buffer nor direct buffer + while (target.hasRemaining()) { + target.put(get(offset++)); + } + } + } + + @SuppressWarnings("restriction") + public final void put(int offset, ByteBuffer source, int numBytes) { + + // check the byte array offset and length + if ((offset | numBytes | (offset + numBytes) | (size - (offset + numBytes))) < 0) { + throw new IndexOutOfBoundsException(); + } + + final int sourceOffset = source.position(); + final int remaining = source.remaining(); + + if (remaining < numBytes) { + throw new BufferUnderflowException(); + } + + if (source.isDirect()) { + // copy to the target memory directly + final long sourcePointer = getAddress(source) + sourceOffset; + final long targetPointer = address + offset; + + if (sourcePointer <= addressLimit - numBytes) { + UNSAFE.copyMemory(sourcePointer, targetPointer, numBytes); + } + else if (address > addressLimit) { + throw new IllegalStateException("This segment has been freed."); + } + else { + throw new IndexOutOfBoundsException(); + } + } + else if (source.hasArray()) { + // move directly into the byte array + put(offset, source.array(), sourceOffset + source.arrayOffset(), numBytes); + + // this must be after the get() call to ensue that the byte buffer is not + // modified in case the call fails + source.position(sourceOffset + numBytes); + } + else { + // neither heap buffer nor direct buffer + while (source.hasRemaining()) { + put(offset++, source.get()); + } + } + } + + @SuppressWarnings("restriction") + public final void copyTo(int offset, PureOffHeapMemorySegment target, int targetOffset, int numBytes) { + final long thisPointer = address + offset; + final long otherPointer = target.address + targetOffset; + + if (numBytes >= 0 && thisPointer <= addressLimit - numBytes && otherPointer <= target.addressLimit - numBytes) { + UNSAFE.copyMemory(thisPointer, otherPointer, numBytes); + } + else if (address > addressLimit || target.address > target.addressLimit) { + throw new IllegalStateException("This segment has been freed."); + } + else { + throw new IndexOutOfBoundsException(); + } + } + + public int compare(MemorySegment seg2, int offset1, int offset2, int len) { + while (len >= 8) { + long l1 = this.getLongBigEndian(offset1); + long l2 = seg2.getLongBigEndian(offset2); + + if (l1 != l2) { + return (l1 < l2) ^ (l1 < 0) ^ (l2 < 0) ? -1 : 1; + } + + offset1 += 8; + offset2 += 8; + len -= 8; + } + while (len > 0) { + int b1 = this.get(offset1) & 0xff; + int b2 = seg2.get(offset2) & 0xff; + int cmp = b1 - b2; + if (cmp != 0) { + return cmp; + } + offset1++; + offset2++; + len--; + } + return 0; + } + + public void swapBytes(byte[] tempBuffer, PureOffHeapMemorySegment seg2, int offset1, int offset2, int len) { + if (len < 32) { + // fast path for short copies + while (len >= 8) { + long tmp = this.getLong(offset1); + this.putLong(offset1, seg2.getLong(offset2)); + seg2.putLong(offset2, tmp); + offset1 += 8; + offset2 += 8; + len -= 8; + } + while (len > 0) { + byte tmp = this.get(offset1); + this.put(offset1, seg2.get(offset2)); + seg2.put(offset2, tmp); + offset1++; + offset2++; + len--; + } + } + else if ( (offset1 | offset2 | len | (offset1 + len) | (offset2 + len) | + (this.size - (offset1 + len)) | (seg2.size() - (offset2 + len))) < 0 || len > tempBuffer.length) + { + throw new IndexOutOfBoundsException(); + } + else { + final long thisPos = this.address + offset1; + final long otherPos = seg2.address + offset2; + + if (thisPos <= this.addressLimit - len && otherPos <= seg2.addressLimit - len) { + final long arrayAddress = BYTE_ARRAY_BASE_OFFSET; + + // this -> temp buffer + UNSAFE.copyMemory(null, thisPos, tempBuffer, arrayAddress, len); + + // other -> this + UNSAFE.copyMemory(null, otherPos, null, thisPos, len); + + // temp buffer -> other + UNSAFE.copyMemory(tempBuffer, arrayAddress, null, otherPos, len); + } + else if (this.address <= 0 || seg2.address <= 0) { + throw new IllegalStateException("Memory segment has been freed."); + } + else { + // index is in fact invalid + throw new IndexOutOfBoundsException(); + } + } + } + + // -------------------------------------------------------------------------------------------- + // Utilities for native memory accesses and checks + // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("restriction") + private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; + + @SuppressWarnings("restriction") + private static final long BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); + + private static final long COPY_PER_BATCH = 1024 * 1024; + + private static final Field ADDRESS_FIELD; + + static { + try { + ADDRESS_FIELD = java.nio.Buffer.class.getDeclaredField("address"); + ADDRESS_FIELD.setAccessible(true); + } + catch (Throwable t) { + throw new RuntimeException("Cannot initialize DirectMemorySegment - direct memory not supported by the JVM."); + } + } + + private static long getAddress(ByteBuffer buf) { + try { + return (Long) ADDRESS_FIELD.get(buf); + } + catch (Throwable t) { + throw new RuntimeException("Could not access direct byte buffer address.", t); + } + } +}
