http://git-wip-us.apache.org/repos/asf/flink/blob/75a52574/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHeapMemorySegment.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHeapMemorySegment.java b/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHeapMemorySegment.java deleted file mode 100644 index e247eed..0000000 --- a/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHeapMemorySegment.java +++ /dev/null @@ -1,466 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.core.memory.benchmarks; - -import org.apache.flink.core.memory.MemoryUtils; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - -public final class PureHeapMemorySegment { - - /** Constant that flags the byte order. Because this is a boolean constant, - * the JIT compiler can use this well to aggressively eliminate the non-applicable code paths */ - private static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN); - - /** The array in which the data is stored. */ - private byte[] memory; - - /** Wrapper for I/O requests. */ - private ByteBuffer wrapper; - - /** The size, stored extra, because we may clear the reference to the byte array */ - private final int size; - - // ------------------------------------------------------------------------- - // Constructors - // ------------------------------------------------------------------------- - - /** - * Creates a new memory segment that represents the data in the given byte array. - * - * @param memory The byte array that holds the data. - */ - public PureHeapMemorySegment(byte[] memory) { - this.memory = memory; - this.size = memory.length; - } - - // ------------------------------------------------------------------------- - // Direct Memory Segment Specifics - // ------------------------------------------------------------------------- - - /** - * Gets the byte array that backs this memory segment. - * - * @return The byte array that backs this memory segment. - */ - public byte[] getArray() { - return this.memory; - } - - // ------------------------------------------------------------------------- - // MemorySegment Accessors - // ------------------------------------------------------------------------- - - public final boolean isFreed() { - return this.memory == null; - } - - public final void free() { - this.wrapper = null; - this.memory = null; - } - - public final int size() { - return this.size; - } - - public final ByteBuffer wrap(int offset, int length) { - if (offset > this.memory.length || offset > this.memory.length - length) { - throw new IndexOutOfBoundsException(); - } - - if (this.wrapper == null) { - this.wrapper = ByteBuffer.wrap(this.memory, offset, length); - } - else { - this.wrapper.limit(offset + length); - this.wrapper.position(offset); - } - - return this.wrapper; - } - - // ------------------------------------------------------------------------ - // Random Access get() and put() methods - // ------------------------------------------------------------------------ - - public final byte get(int index) { - return this.memory[index]; - } - - public final void put(int index, byte b) { - this.memory[index] = b; - } - - public final void get(int index, byte[] dst) { - get(index, dst, 0, dst.length); - } - - public final void put(int index, byte[] src) { - put(index, src, 0, src.length); - } - - public final void get(int index, byte[] dst, int offset, int length) { - // system arraycopy does the boundary checks anyways, no need to check extra - System.arraycopy(this.memory, index, dst, offset, length); - } - - public final void put(int index, byte[] src, int offset, int length) { - // system arraycopy does the boundary checks anyways, no need to check extra - System.arraycopy(src, offset, this.memory, index, length); - } - - public final boolean getBoolean(int index) { - return this.memory[index] != 0; - } - - public final void putBoolean(int index, boolean value) { - this.memory[index] = (byte) (value ? 1 : 0); - } - - @SuppressWarnings("restriction") - public final char getChar(int index) { - if (index >= 0 && index <= this.memory.length - 2) { - return UNSAFE.getChar(this.memory, BASE_OFFSET + index); - } else { - throw new IndexOutOfBoundsException(); - } - } - - public final char getCharLittleEndian(int index) { - if (LITTLE_ENDIAN) { - return getChar(index); - } else { - return Character.reverseBytes(getChar(index)); - } - } - - public final char getCharBigEndian(int index) { - if (LITTLE_ENDIAN) { - return Character.reverseBytes(getChar(index)); - } else { - return getChar(index); - } - } - - @SuppressWarnings("restriction") - public final void putChar(int index, char value) { - if (index >= 0 && index <= this.memory.length - 2) { - UNSAFE.putChar(this.memory, BASE_OFFSET + index, value); - } else { - throw new IndexOutOfBoundsException(); - } - } - - public final void putCharLittleEndian(int index, char value) { - if (LITTLE_ENDIAN) { - putChar(index, value); - } else { - putChar(index, Character.reverseBytes(value)); - } - } - - public final void putCharBigEndian(int index, char value) { - if (LITTLE_ENDIAN) { - putChar(index, Character.reverseBytes(value)); - } else { - putChar(index, value); - } - } - - @SuppressWarnings("restriction") - public final short getShort(int index) { - if (index >= 0 && index <= this.memory.length - 2) { - return UNSAFE.getShort(this.memory, BASE_OFFSET + index); - } else { - throw new IndexOutOfBoundsException(); - } - } - - public final short getShortLittleEndian(int index) { - if (LITTLE_ENDIAN) { - return getShort(index); - } else { - return Short.reverseBytes(getShort(index)); - } - } - - public final short getShortBigEndian(int index) { - if (LITTLE_ENDIAN) { - return Short.reverseBytes(getShort(index)); - } else { - return getShort(index); - } - } - - @SuppressWarnings("restriction") - public final void putShort(int index, short value) { - if (index >= 0 && index <= this.memory.length - 2) { - UNSAFE.putShort(this.memory, BASE_OFFSET + index, value); - } else { - throw new IndexOutOfBoundsException(); - } - } - - public final void putShortLittleEndian(int index, short value) { - if (LITTLE_ENDIAN) { - putShort(index, value); - } else { - putShort(index, Short.reverseBytes(value)); - } - } - - public final void putShortBigEndian(int index, short value) { - if (LITTLE_ENDIAN) { - putShort(index, Short.reverseBytes(value)); - } else { - putShort(index, value); - } - } - - @SuppressWarnings("restriction") - public final int getInt(int index) { - if (index >= 0 && index <= this.memory.length - 4) { - return UNSAFE.getInt(this.memory, BASE_OFFSET + index); - } else { - throw new IndexOutOfBoundsException(); - } - } - - public final int getIntLittleEndian(int index) { - if (LITTLE_ENDIAN) { - return getInt(index); - } else { - return Integer.reverseBytes(getInt(index)); - } - } - - public final int getIntBigEndian(int index) { - if (LITTLE_ENDIAN) { - return Integer.reverseBytes(getInt(index)); - } else { - return getInt(index); - } - } - - @SuppressWarnings("restriction") - public final void putInt(int index, int value) { - if (index >= 0 && index <= this.memory.length - 4) { - UNSAFE.putInt(this.memory, BASE_OFFSET + index, value); - } else { - throw new IndexOutOfBoundsException(); - } - } - - public final void putIntLittleEndian(int index, int value) { - if (LITTLE_ENDIAN) { - putInt(index, value); - } else { - putInt(index, Integer.reverseBytes(value)); - } - } - - public final void putIntBigEndian(int index, int value) { - if (LITTLE_ENDIAN) { - putInt(index, Integer.reverseBytes(value)); - } else { - putInt(index, value); - } - } - - @SuppressWarnings("restriction") - public final long getLong(int index) { - if (index >= 0 && index <= this.memory.length - 8) { - return UNSAFE.getLong(this.memory, BASE_OFFSET + index); - } else { - throw new IndexOutOfBoundsException(); - } - } - - public final long getLongLittleEndian(int index) { - if (LITTLE_ENDIAN) { - return getLong(index); - } else { - return Long.reverseBytes(getLong(index)); - } - } - - public final long getLongBigEndian(int index) { - if (LITTLE_ENDIAN) { - return Long.reverseBytes(getLong(index)); - } else { - return getLong(index); - } - } - - @SuppressWarnings("restriction") - public final void putLong(int index, long value) { - if (index >= 0 && index <= this.memory.length - 8) { - UNSAFE.putLong(this.memory, BASE_OFFSET + index, value); - } else { - throw new IndexOutOfBoundsException(); - } - } - - public final void putLongLittleEndian(int index, long value) { - if (LITTLE_ENDIAN) { - putLong(index, value); - } else { - putLong(index, Long.reverseBytes(value)); - } - } - - public final void putLongBigEndian(int index, long value) { - if (LITTLE_ENDIAN) { - putLong(index, Long.reverseBytes(value)); - } else { - putLong(index, value); - } - } - - public final float getFloat(int index) { - return Float.intBitsToFloat(getInt(index)); - } - - public final float getFloatLittleEndian(int index) { - return Float.intBitsToFloat(getIntLittleEndian(index)); - } - - public final float getFloatBigEndian(int index) { - return Float.intBitsToFloat(getIntBigEndian(index)); - } - - public final void putFloat(int index, float value) { - putInt(index, Float.floatToRawIntBits(value)); - } - - public final void putFloatLittleEndian(int index, float value) { - putIntLittleEndian(index, Float.floatToRawIntBits(value)); - } - - public final void putFloatBigEndian(int index, float value) { - putIntBigEndian(index, Float.floatToRawIntBits(value)); - } - - public final double getDouble(int index) { - return Double.longBitsToDouble(getLong(index)); - } - - public final double getDoubleLittleEndian(int index) { - return Double.longBitsToDouble(getLongLittleEndian(index)); - } - - public final double getDoubleBigEndian(int index) { - return Double.longBitsToDouble(getLongBigEndian(index)); - } - - public final void putDouble(int index, double value) { - putLong(index, Double.doubleToRawLongBits(value)); - } - - public final void putDoubleLittleEndian(int index, double value) { - putLongLittleEndian(index, Double.doubleToRawLongBits(value)); - } - - public final void putDoubleBigEndian(int index, double value) { - putLongBigEndian(index, Double.doubleToRawLongBits(value)); - } - - // ------------------------------------------------------------------------- - // Bulk Read and Write Methods - // ------------------------------------------------------------------------- - - public final void get(DataOutput out, int offset, int length) throws IOException { - out.write(this.memory, offset, length); - } - - public final void put(DataInput in, int offset, int length) throws IOException { - in.readFully(this.memory, offset, length); - } - - public final void get(int offset, ByteBuffer target, int numBytes) { - // ByteBuffer performs the boundary checks - target.put(this.memory, offset, numBytes); - } - - public final void put(int offset, ByteBuffer source, int numBytes) { - // ByteBuffer performs the boundary checks - source.get(this.memory, offset, numBytes); - } - - public final void copyTo(int offset, PureHeapMemorySegment target, int targetOffset, int numBytes) { - // system arraycopy does the boundary checks anyways, no need to check extra - System.arraycopy(this.memory, offset, target.memory, targetOffset, numBytes); - } - - // ------------------------------------------------------------------------- - // Comparisons & Swapping - // ------------------------------------------------------------------------- - - public final int compare(PureHeapMemorySegment seg2, int offset1, int offset2, int len) { - final byte[] b2 = seg2.memory; - final byte[] b1 = this.memory; - - int val = 0; - for (int pos = 0; pos < len && (val = (b1[offset1 + pos] & 0xff) - (b2[offset2 + pos] & 0xff)) == 0; pos++); - return val; - } - - public final void swapBytes(PureHeapMemorySegment seg2, int offset1, int offset2, int len) { - // swap by bytes (chunks of 8 first, then single bytes) - while (len >= 8) { - long tmp = this.getLong(offset1); - this.putLong(offset1, seg2.getLong(offset2)); - seg2.putLong(offset2, tmp); - offset1 += 8; - offset2 += 8; - len -= 8; - } - while (len > 0) { - byte tmp = this.get(offset1); - this.put(offset1, seg2.get(offset2)); - seg2.put(offset2, tmp); - offset1++; - offset2++; - len--; - } - } - - public final void swapBytes(byte[] auxBuffer, PureHeapMemorySegment seg2, int offset1, int offset2, int len) { - byte[] otherMem = seg2.memory; - System.arraycopy(this.memory, offset1, auxBuffer, 0, len); - System.arraycopy(otherMem, offset2, this.memory, offset1, len); - System.arraycopy(auxBuffer, 0, otherMem, offset2, len); - } - - // -------------------------------------------------------------------------------------------- - // Utilities for native memory accesses and checks - // -------------------------------------------------------------------------------------------- - - @SuppressWarnings("restriction") - private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; - - @SuppressWarnings("restriction") - private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); -}
http://git-wip-us.apache.org/repos/asf/flink/blob/75a52574/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHeapMemorySegmentOutView.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHeapMemorySegmentOutView.java b/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHeapMemorySegmentOutView.java deleted file mode 100644 index 1e3b89e..0000000 --- a/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHeapMemorySegmentOutView.java +++ /dev/null @@ -1,359 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.core.memory.benchmarks; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -import java.io.EOFException; -import java.io.IOException; -import java.io.UTFDataFormatException; -import java.util.List; - -public final class PureHeapMemorySegmentOutView implements DataOutputView { - - private PureHeapMemorySegment currentSegment; // the current memory segment to write to - - private int positionInSegment; // the offset in the current segment - - private final int segmentSize; // the size of the memory segments - - private final List<PureHeapMemorySegment> memorySource; - - private final List<PureHeapMemorySegment> fullSegments; - - - private byte[] utfBuffer; // the reusable array for UTF encodings - - - public PureHeapMemorySegmentOutView(List<PureHeapMemorySegment> emptySegments, - List<PureHeapMemorySegment> fullSegmentTarget, int segmentSize) { - this.segmentSize = segmentSize; - this.currentSegment = emptySegments.remove(emptySegments.size() - 1); - - this.memorySource = emptySegments; - this.fullSegments = fullSegmentTarget; - this.fullSegments.add(getCurrentSegment()); - } - - - public void reset() { - if (this.fullSegments.size() != 0) { - throw new IllegalStateException("The target list still contains memory segments."); - } - - clear(); - try { - advance(); - } - catch (IOException ioex) { - throw new RuntimeException("Error getting first segment for record collector.", ioex); - } - } - - // -------------------------------------------------------------------------------------------- - // Page Management - // -------------------------------------------------------------------------------------------- - - public PureHeapMemorySegment nextSegment(PureHeapMemorySegment current, int positionInCurrent) throws EOFException { - int size = this.memorySource.size(); - if (size > 0) { - final PureHeapMemorySegment next = this.memorySource.remove(size - 1); - this.fullSegments.add(next); - return next; - } else { - throw new EOFException(); - } - } - - public PureHeapMemorySegment getCurrentSegment() { - return this.currentSegment; - } - - public int getCurrentPositionInSegment() { - return this.positionInSegment; - } - - public int getSegmentSize() { - return this.segmentSize; - } - - protected void advance() throws IOException { - this.currentSegment = nextSegment(this.currentSegment, this.positionInSegment); - this.positionInSegment = 0; - } - - protected void seekOutput(PureHeapMemorySegment seg, int position) { - this.currentSegment = seg; - this.positionInSegment = position; - } - - protected void clear() { - this.currentSegment = null; - this.positionInSegment = 0; - } - - // -------------------------------------------------------------------------------------------- - // Data Output Specific methods - // -------------------------------------------------------------------------------------------- - - @Override - public void write(int b) throws IOException { - writeByte(b); - } - - @Override - public void write(byte[] b) throws IOException { - write(b, 0, b.length); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - int remaining = this.segmentSize - this.positionInSegment; - if (remaining >= len) { - this.currentSegment.put(this.positionInSegment, b, off, len); - this.positionInSegment += len; - } - else { - if (remaining == 0) { - advance(); - remaining = this.segmentSize - this.positionInSegment; - } - while (true) { - int toPut = Math.min(remaining, len); - this.currentSegment.put(this.positionInSegment, b, off, toPut); - off += toPut; - len -= toPut; - - if (len > 0) { - this.positionInSegment = this.segmentSize; - advance(); - remaining = this.segmentSize - this.positionInSegment; - } - else { - this.positionInSegment += toPut; - break; - } - } - } - } - - @Override - public void writeBoolean(boolean v) throws IOException { - writeByte(v ? 1 : 0); - } - - @Override - public void writeByte(int v) throws IOException { - if (this.positionInSegment < this.segmentSize) { - this.currentSegment.put(this.positionInSegment++, (byte) v); - } - else { - advance(); - writeByte(v); - } - } - - @Override - public void writeShort(int v) throws IOException { - if (this.positionInSegment < this.segmentSize - 1) { - this.currentSegment.putShortBigEndian(this.positionInSegment, (short) v); - this.positionInSegment += 2; - } - else if (this.positionInSegment == this.segmentSize) { - advance(); - writeShort(v); - } - else { - writeByte(v >> 8); - writeByte(v); - } - } - - @Override - public void writeChar(int v) throws IOException { - if (this.positionInSegment < this.segmentSize - 1) { - this.currentSegment.putCharBigEndian(this.positionInSegment, (char) v); - this.positionInSegment += 2; - } - else if (this.positionInSegment == this.segmentSize) { - advance(); - writeChar(v); - } - else { - writeByte(v >> 8); - writeByte(v); - } - } - - @Override - public void writeInt(int v) throws IOException { - if (this.positionInSegment < this.segmentSize - 3) { - this.currentSegment.putIntBigEndian(this.positionInSegment, v); - this.positionInSegment += 4; - } - else if (this.positionInSegment == this.segmentSize) { - advance(); - writeInt(v); - } - else { - writeByte(v >> 24); - writeByte(v >> 16); - writeByte(v >> 8); - writeByte(v); - } - } - - @Override - public void writeLong(long v) throws IOException { - if (this.positionInSegment < this.segmentSize - 7) { - this.currentSegment.putLongBigEndian(this.positionInSegment, v); - this.positionInSegment += 8; - } - else if (this.positionInSegment == this.segmentSize) { - advance(); - writeLong(v); - } - else { - writeByte((int) (v >> 56)); - writeByte((int) (v >> 48)); - writeByte((int) (v >> 40)); - writeByte((int) (v >> 32)); - writeByte((int) (v >> 24)); - writeByte((int) (v >> 16)); - writeByte((int) (v >> 8)); - writeByte((int) v); - } - } - - @Override - public void writeFloat(float v) throws IOException { - writeInt(Float.floatToRawIntBits(v)); - } - - @Override - public void writeDouble(double v) throws IOException { - writeLong(Double.doubleToRawLongBits(v)); - } - - @Override - public void writeBytes(String s) throws IOException { - for (int i = 0; i < s.length(); i++) { - writeByte(s.charAt(i)); - } - } - - @Override - public void writeChars(String s) throws IOException { - for (int i = 0; i < s.length(); i++) { - writeChar(s.charAt(i)); - } - } - - @Override - public void writeUTF(String str) throws IOException { - int strlen = str.length(); - int utflen = 0; - int c, count = 0; - - /* use charAt instead of copying String to char array */ - for (int i = 0; i < strlen; i++) { - c = str.charAt(i); - if ((c >= 0x0001) && (c <= 0x007F)) { - utflen++; - } else if (c > 0x07FF) { - utflen += 3; - } else { - utflen += 2; - } - } - - if (utflen > 65535) { - throw new UTFDataFormatException("encoded string too long: " + utflen + " memory"); - } - - if (this.utfBuffer == null || this.utfBuffer.length < utflen + 2) { - this.utfBuffer = new byte[utflen + 2]; - } - final byte[] bytearr = this.utfBuffer; - - bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF); - bytearr[count++] = (byte) (utflen & 0xFF); - - int i = 0; - for (i = 0; i < strlen; i++) { - c = str.charAt(i); - if (!((c >= 0x0001) && (c <= 0x007F))) { - break; - } - bytearr[count++] = (byte) c; - } - - for (; i < strlen; i++) { - c = str.charAt(i); - if ((c >= 0x0001) && (c <= 0x007F)) { - bytearr[count++] = (byte) c; - - } else if (c > 0x07FF) { - bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); - bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F)); - bytearr[count++] = (byte) (0x80 | (c & 0x3F)); - } else { - bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); - bytearr[count++] = (byte) (0x80 | (c & 0x3F)); - } - } - - write(bytearr, 0, utflen + 2); - } - - @Override - public void skipBytesToWrite(int numBytes) throws IOException { - while (numBytes > 0) { - final int remaining = this.segmentSize - this.positionInSegment; - if (numBytes <= remaining) { - this.positionInSegment += numBytes; - return; - } - this.positionInSegment = this.segmentSize; - advance(); - numBytes -= remaining; - } - } - - @Override - public void write(DataInputView source, int numBytes) throws IOException { - while (numBytes > 0) { - final int remaining = this.segmentSize - this.positionInSegment; - if (numBytes <= remaining) { - this.currentSegment.put(source, this.positionInSegment, numBytes); - this.positionInSegment += numBytes; - return; - } - - if (remaining > 0) { - this.currentSegment.put(source, this.positionInSegment, remaining); - this.positionInSegment = this.segmentSize; - numBytes -= remaining; - } - - advance(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/75a52574/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHybridMemorySegment.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHybridMemorySegment.java b/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHybridMemorySegment.java deleted file mode 100644 index 57817b9..0000000 --- a/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHybridMemorySegment.java +++ /dev/null @@ -1,887 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.core.memory.benchmarks; - -import org.apache.flink.core.memory.MemoryUtils; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.lang.reflect.Field; -import java.nio.BufferOverflowException; -import java.nio.BufferUnderflowException; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - -public final class PureHybridMemorySegment { - - /** Constant that flags the byte order. Because this is a boolean constant, - * the JIT compiler can use this well to aggressively eliminate the non-applicable code paths */ - private static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN); - - /** The direct byte buffer that allocated the off-heap memory. This memory segment holds a reference - * to that buffer, so as long as this memory segment lives, the memory will not be released. */ - private final ByteBuffer offHeapMemory; - - /** The heap byte array object relative to which we access the memory. Is non-null if the - * memory is on the heap, is null, if the memory if off the heap. If we have this buffer, we - * must never void this reference, or the memory segment will point to undefined addresses - * outside the heap and may in out-of-order execution cases cause segmentation faults. */ - private final byte[] heapMemory; - - /** The address to the data, relative to the heap memory byte array. If the heap memory byte array - * is null, this becomes an absolute memory address outside the heap. */ - private long address; - - /** The address one byte after the last addressable byte. - * This is address + size while the segment is not disposed */ - private final long addressLimit; - - /** The size in bytes of the memory segment */ - private final int size; - - // ------------------------------------------------------------------------- - // Constructors - // ------------------------------------------------------------------------- - - /** - * Creates a new memory segment that represents the memory backing the given direct byte buffer. - * Note that the given ByteBuffer must be direct {@link java.nio.ByteBuffer#allocateDirect(int)}, - * otherwise this method with throw an IllegalArgumentException. - * - * @param buffer The byte buffer whose memory is represented by this memory segment. - * @throws IllegalArgumentException Thrown, if the given ByteBuffer is not direct. - */ - public PureHybridMemorySegment(ByteBuffer buffer) { - if (buffer == null || !buffer.isDirect()) { - throw new IllegalArgumentException("Can't initialize from non-direct ByteBuffer."); - } - - this.offHeapMemory = buffer; - this.heapMemory = null; - this.size = buffer.capacity(); - this.address = getAddress(buffer); - this.addressLimit = this.address + size; - - if (address >= Long.MAX_VALUE - Integer.MAX_VALUE) { - throw new RuntimeException("Segment initialized with too large address: " + address - + " ; Max allowed address is " + (Long.MAX_VALUE - Integer.MAX_VALUE - 1)); - } - } - - /** - * Creates a new memory segment that represents the memory of the byte array. - * - * @param buffer The byte array whose memory is represented by this memory segment. - */ - public PureHybridMemorySegment(byte[] buffer) { - if (buffer == null) { - throw new NullPointerException("buffer"); - } - - this.offHeapMemory = null; - this.heapMemory = buffer; - this.address = BYTE_ARRAY_BASE_OFFSET; - this.addressLimit = BYTE_ARRAY_BASE_OFFSET + buffer.length; - this.size = buffer.length; - } - - // ------------------------------------------------------------------------- - // Memory Segment Specifics - // ------------------------------------------------------------------------- - - /** - * Gets the size of the memory segment, in bytes. - * @return The size of the memory segment. - */ - public final int size() { - return size; - } - - /** - * Checks whether the memory segment was freed. - * @return True, if the memory segment has been freed, false otherwise. - */ - public final boolean isFreed() { - return this.address > this.addressLimit; - } - - /** - * Frees this memory segment. After this operation has been called, no further operations are - * possible on the memory segment and will fail. The actual memory (heap or off-heap) will only - * be released after this memory segment object has become garbage collected. - */ - public final void free() { - // this ensures we can place no more data and trigger - // the checks for the freed segment - address = addressLimit + 1; - } - - /** - * Checks whether this memory segment is backed by off-heap memory. - * @return True, if the memory segment is backed by off-heap memory, false if it is backed - * by heap memory. - */ - public final boolean isOffHeap() { - return heapMemory == null; - } - - public byte[] getArray() { - if (heapMemory != null) { - return heapMemory; - } else { - throw new IllegalStateException("Memory segment does not represent heap memory"); - } - } - - /** - * Gets the buffer that owns the memory of this memory segment. - * - * @return The byte buffer that owns the memory of this memory segment. - */ - public ByteBuffer getOffHeapBuffer() { - if (offHeapMemory != null) { - return offHeapMemory; - } else { - throw new IllegalStateException("Memory segment does not represent off heap memory"); - } - } - - public ByteBuffer wrap(int offset, int length) { - if (offset < 0 || offset > this.size || offset > this.size - length) { - throw new IndexOutOfBoundsException(); - } - - if (heapMemory != null) { - return ByteBuffer.wrap(heapMemory, offset, length); - } - else { - ByteBuffer wrapper = offHeapMemory.duplicate(); - wrapper.limit(offset + length); - wrapper.position(offset); - return wrapper; - } - } - - /** - * Gets this memory segment as a pure heap memory segment. - * - * @return A heap memory segment variant of this memory segment. - * @throws UnsupportedOperationException Thrown, if this memory segment is not - * backed by heap memory. - */ - public final PureHeapMemorySegment asHeapSegment() { - if (heapMemory != null) { - return new PureHeapMemorySegment(heapMemory); - } else { - throw new UnsupportedOperationException("Memory segment is not backed by heap memory"); - } - } - - /** - * Gets this memory segment as a pure off-heap memory segment. - * - * @return An off-heap memory segment variant of this memory segment. - * @throws UnsupportedOperationException Thrown, if this memory segment is not - * backed by off-heap memory. - */ - public final PureOffHeapMemorySegment asOffHeapSegment() { - if (offHeapMemory != null) { - return new PureOffHeapMemorySegment(offHeapMemory); - } else { - throw new UnsupportedOperationException("Memory segment is not backed by off-heap memory"); - } - } - - // ------------------------------------------------------------------------ - // Random Access get() and put() methods - // ------------------------------------------------------------------------ - - @SuppressWarnings("restriction") - public final byte get(int index) { - final long pos = address + index; - if (index >= 0 && pos < addressLimit) { - return UNSAFE.getByte(heapMemory, pos); - } - else if (address > addressLimit) { - throw new IllegalStateException("This segment has been freed."); - } - else { - // index is in fact invalid - throw new IndexOutOfBoundsException(); - } - } - - @SuppressWarnings("restriction") - public final void put(int index, byte b) { - final long pos = address + index; - if (index >= 0 && pos < addressLimit) { - UNSAFE.putByte(heapMemory, pos, b); - } - else if (address > addressLimit) { - throw new IllegalStateException("This segment has been freed."); - } - else { - // index is in fact invalid - throw new IndexOutOfBoundsException(); - } - } - - public final void get(int index, byte[] dst) { - get(index, dst, 0, dst.length); - } - - public final void put(int index, byte[] src) { - put(index, src, 0, src.length); - } - - @SuppressWarnings("restriction") - public final void get(int index, byte[] dst, int offset, int length) { - // check the byte array offset and length - if ((offset | length | (offset + length) | (dst.length - (offset + length))) < 0) { - throw new IndexOutOfBoundsException(); - } - - long pos = address + index; - - if (index >= 0 && pos <= addressLimit - length) { - long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset; - - // the copy must proceed in batches not too large, because the JVM may - // poll for points that are safe for GC (moving the array and changing its address) - while (length > 0) { - long toCopy = Math.min(length, COPY_PER_BATCH); - UNSAFE.copyMemory(heapMemory, pos, dst, arrayAddress, toCopy); - length -= toCopy; - pos += toCopy; - arrayAddress += toCopy; - } - } - else if (address > addressLimit) { - throw new IllegalStateException("This segment has been freed."); - } - else { - // index is in fact invalid - throw new IndexOutOfBoundsException(); - } - } - - @SuppressWarnings("restriction") - public final void put(int index, byte[] src, int offset, int length) { - // check the byte array offset and length - if ((offset | length | (offset + length) | (src.length - (offset + length))) < 0) { - throw new IndexOutOfBoundsException(); - } - - long pos = address + index; - - if (index >= 0 && pos <= addressLimit - length) { - long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset; - while (length > 0) { - long toCopy = Math.min(length, COPY_PER_BATCH); - UNSAFE.copyMemory(src, arrayAddress, heapMemory, pos, toCopy); - length -= toCopy; - pos += toCopy; - arrayAddress += toCopy; - } - } - else if (address > addressLimit) { - throw new IllegalStateException("This segment has been freed."); - } - else { - // index is in fact invalid - throw new IndexOutOfBoundsException(); - } - } - - public final boolean getBoolean(int index) { - return get(index) != 0; - } - - public final void putBoolean(int index, boolean value) { - put(index, (byte) (value ? 1 : 0)); - } - - @SuppressWarnings("restriction") - public final char getChar(int index) { - final long pos = address + index; - if (index >= 0 && pos <= addressLimit - 2) { - return UNSAFE.getChar(heapMemory, pos); - } - else if (address > addressLimit) { - throw new IllegalStateException("This segment has been freed."); - } - else { - // index is in fact invalid - throw new IndexOutOfBoundsException(); - } - } - - public final char getCharLittleEndian(int index) { - if (LITTLE_ENDIAN) { - return getChar(index); - } else { - return Character.reverseBytes(getChar(index)); - } - } - - public final char getCharBigEndian(int index) { - if (LITTLE_ENDIAN) { - return Character.reverseBytes(getChar(index)); - } else { - return getChar(index); - } - } - - @SuppressWarnings("restriction") - public final void putChar(int index, char value) { - final long pos = address + index; - if (index >= 0 && pos <= addressLimit - 2) { - UNSAFE.putChar(heapMemory, pos, value); - } - else if (address > addressLimit) { - throw new IllegalStateException("This segment has been freed."); - } - else { - // index is in fact invalid - throw new IndexOutOfBoundsException(); - } - } - - public final void putCharLittleEndian(int index, char value) { - if (LITTLE_ENDIAN) { - putChar(index, value); - } else { - putChar(index, Character.reverseBytes(value)); - } - } - - public final void putCharBigEndian(int index, char value) { - if (LITTLE_ENDIAN) { - putChar(index, Character.reverseBytes(value)); - } else { - putChar(index, value); - } - } - - @SuppressWarnings("restriction") - public final short getShort(int index) { - final long pos = address + index; - if (index >= 0 && pos <= addressLimit - 2) { - return UNSAFE.getShort(heapMemory, pos); - } - else if (address > addressLimit) { - throw new IllegalStateException("This segment has been freed."); - } - else { - // index is in fact invalid - throw new IndexOutOfBoundsException(); - } - } - - public final short getShortLittleEndian(int index) { - if (LITTLE_ENDIAN) { - return getShort(index); - } else { - return Short.reverseBytes(getShort(index)); - } - } - - public final short getShortBigEndian(int index) { - if (LITTLE_ENDIAN) { - return Short.reverseBytes(getShort(index)); - } else { - return getShort(index); - } - } - - @SuppressWarnings("restriction") - public final void putShort(int index, short value) { - final long pos = address + index; - if (index >= 0 && pos <= addressLimit - 2) { - UNSAFE.putShort(heapMemory, pos, value); - } - else if (address > addressLimit) { - throw new IllegalStateException("This segment has been freed."); - } - else { - // index is in fact invalid - throw new IndexOutOfBoundsException(); - } - } - - public final void putShortLittleEndian(int index, short value) { - if (LITTLE_ENDIAN) { - putShort(index, value); - } else { - putShort(index, Short.reverseBytes(value)); - } - } - - public final void putShortBigEndian(int index, short value) { - if (LITTLE_ENDIAN) { - putShort(index, Short.reverseBytes(value)); - } else { - putShort(index, value); - } - } - - @SuppressWarnings("restriction") - public final int getInt(int index) { - final long pos = address + index; - if (index >= 0 && pos <= addressLimit - 4) { - return UNSAFE.getInt(heapMemory, pos); - } - else if (address > addressLimit) { - throw new IllegalStateException("This segment has been freed."); - } - else { - // index is in fact invalid - throw new IndexOutOfBoundsException(); - } - } - - public final int getIntLittleEndian(int index) { - if (LITTLE_ENDIAN) { - return getInt(index); - } else { - return Integer.reverseBytes(getInt(index)); - } - } - - public final int getIntBigEndian(int index) { - if (LITTLE_ENDIAN) { - return Integer.reverseBytes(getInt(index)); - } else { - return getInt(index); - } - } - - @SuppressWarnings("restriction") - public final void putInt(int index, int value) { - final long pos = address + index; - if (index >= 0 && pos <= addressLimit - 4) { - UNSAFE.putInt(heapMemory, pos, value); - } - else if (address > addressLimit) { - throw new IllegalStateException("This segment has been freed."); - } - else { - // index is in fact invalid - throw new IndexOutOfBoundsException(); - } - } - - public final void putIntLittleEndian(int index, int value) { - if (LITTLE_ENDIAN) { - putInt(index, value); - } else { - putInt(index, Integer.reverseBytes(value)); - } - } - - public final void putIntBigEndian(int index, int value) { - if (LITTLE_ENDIAN) { - putInt(index, Integer.reverseBytes(value)); - } else { - putInt(index, value); - } - } - - @SuppressWarnings("restriction") - public final long getLong(int index) { - final long pos = address + index; - if (index >= 0 && pos <= addressLimit - 8) { - return UNSAFE.getLong(heapMemory, pos); - } - else if (address > addressLimit) { - throw new IllegalStateException("This segment has been freed."); - } - else { - // index is in fact invalid - throw new IndexOutOfBoundsException(); - } - } - - public final long getLongLittleEndian(int index) { - if (LITTLE_ENDIAN) { - return getLong(index); - } else { - return Long.reverseBytes(getLong(index)); - } - } - - public final long getLongBigEndian(int index) { - if (LITTLE_ENDIAN) { - return Long.reverseBytes(getLong(index)); - } else { - return getLong(index); - } - } - - @SuppressWarnings("restriction") - public final void putLong(int index, long value) { - final long pos = address + index; - if (index >= 0 && pos <= addressLimit - 8) { - UNSAFE.putLong(heapMemory, pos, value); - } - else if (address > addressLimit) { - throw new IllegalStateException("This segment has been freed."); - } - else { - // index is in fact invalid - throw new IndexOutOfBoundsException(); - } - } - - public final void putLongLittleEndian(int index, long value) { - if (LITTLE_ENDIAN) { - putLong(index, value); - } else { - putLong(index, Long.reverseBytes(value)); - } - } - - public final void putLongBigEndian(int index, long value) { - if (LITTLE_ENDIAN) { - putLong(index, Long.reverseBytes(value)); - } else { - putLong(index, value); - } - } - - public final float getFloat(int index) { - return Float.intBitsToFloat(getInt(index)); - } - - public final float getFloatLittleEndian(int index) { - return Float.intBitsToFloat(getIntLittleEndian(index)); - } - - public final float getFloatBigEndian(int index) { - return Float.intBitsToFloat(getIntBigEndian(index)); - } - - public final void putFloat(int index, float value) { - putInt(index, Float.floatToRawIntBits(value)); - } - - public final void putFloatLittleEndian(int index, float value) { - putIntLittleEndian(index, Float.floatToRawIntBits(value)); - } - - public final void putFloatBigEndian(int index, float value) { - putIntBigEndian(index, Float.floatToRawIntBits(value)); - } - - public final double getDouble(int index) { - return Double.longBitsToDouble(getLong(index)); - } - - public final double getDoubleLittleEndian(int index) { - return Double.longBitsToDouble(getLongLittleEndian(index)); - } - - public final double getDoubleBigEndian(int index) { - return Double.longBitsToDouble(getLongBigEndian(index)); - } - - public final void putDouble(int index, double value) { - putLong(index, Double.doubleToRawLongBits(value)); - } - - public final void putDoubleLittleEndian(int index, double value) { - putLongLittleEndian(index, Double.doubleToRawLongBits(value)); - } - - public final void putDoubleBigEndian(int index, double value) { - putLongBigEndian(index, Double.doubleToRawLongBits(value)); - } - - // ------------------------------------------------------------------------- - // Bulk Read and Write Methods - // ------------------------------------------------------------------------- - - public final void get(DataOutput out, int offset, int length) throws IOException { - if (heapMemory != null) { - out.write(heapMemory, offset, length); - } - else { - while (length >= 8) { - out.writeLong(getLongBigEndian(offset)); - offset += 8; - length -= 8; - } - - while (length > 0) { - out.writeByte(get(offset)); - offset++; - length--; - } - } - } - - public final void put(DataInput in, int offset, int length) throws IOException { - if (heapMemory != null) { - in.readFully(heapMemory, offset, length); - } - else { - while (length >= 8) { - putLongBigEndian(offset, in.readLong()); - offset += 8; - length -= 8; - } - while(length > 0) { - put(offset, in.readByte()); - offset++; - length--; - } - } - } - - @SuppressWarnings("restriction") - public final void get(int offset, ByteBuffer target, int numBytes) { - if (heapMemory != null) { - // ByteBuffer performs the boundary checks - target.put(heapMemory, offset, numBytes); - } - else { - // check the byte array offset and length - if ((offset | numBytes | (offset + numBytes) | (size - (offset + numBytes))) < 0) { - throw new IndexOutOfBoundsException(); - } - - final int targetOffset = target.position(); - final int remaining = target.remaining(); - - if (remaining < numBytes) { - throw new BufferOverflowException(); - } - - if (target.isDirect()) { - // copy to the target memory directly - final long targetPointer = getAddress(target) + targetOffset; - final long sourcePointer = address + offset; - - if (sourcePointer <= addressLimit - numBytes) { - UNSAFE.copyMemory(sourcePointer, targetPointer, numBytes); - } - else if (address > addressLimit) { - throw new IllegalStateException("This segment has been freed."); - } - else { - throw new IndexOutOfBoundsException(); - } - } - else if (target.hasArray()) { - // move directly into the byte array - get(offset, target.array(), targetOffset + target.arrayOffset(), numBytes); - - // this must be after the get() call to ensue that the byte buffer is not - // modified in case the call fails - target.position(targetOffset + numBytes); - } - else { - // neither heap buffer nor direct buffer - while (target.hasRemaining()) { - target.put(get(offset++)); - } - } - } - } - - @SuppressWarnings("restriction") - public final void put(int offset, ByteBuffer source, int numBytes) { - if (heapMemory != null) { - source.get(heapMemory, offset, numBytes); - } - else { - // check the byte array offset and length - if ((offset | numBytes | (offset + numBytes) | (size - (offset + numBytes))) < 0) { - throw new IndexOutOfBoundsException(); - } - - final int sourceOffset = source.position(); - final int remaining = source.remaining(); - - if (remaining < numBytes) { - throw new BufferUnderflowException(); - } - - if (source.isDirect()) { - // copy to the target memory directly - final long sourcePointer = getAddress(source) + sourceOffset; - final long targetPointer = address + offset; - - if (sourcePointer <= addressLimit - numBytes) { - UNSAFE.copyMemory(sourcePointer, targetPointer, numBytes); - } - else if (address > addressLimit) { - throw new IllegalStateException("This segment has been freed."); - } - else { - throw new IndexOutOfBoundsException(); - } - } - else if (source.hasArray()) { - // move directly into the byte array - put(offset, source.array(), sourceOffset + source.arrayOffset(), numBytes); - - // this must be after the get() call to ensue that the byte buffer is not - // modified in case the call fails - source.position(sourceOffset + numBytes); - } - else { - // neither heap buffer nor direct buffer - while (source.hasRemaining()) { - put(offset++, source.get()); - } - } - } - } - - @SuppressWarnings("restriction") - public final void copyTo(int offset, PureHybridMemorySegment target, int targetOffset, int numBytes) { - final byte[] thisHeapRef = this.heapMemory; - final byte[] otherHeapRef = target.heapMemory; - final long thisPointer = this.address + offset; - final long otherPointer = target.address + targetOffset; - - if (numBytes >= 0 & thisPointer <= this.addressLimit - numBytes & otherPointer <= target.addressLimit - numBytes) { - UNSAFE.copyMemory(thisHeapRef, thisPointer, otherHeapRef, otherPointer, numBytes); - } - else if (address > addressLimit | target.address > target.addressLimit) { - throw new IllegalStateException("segment has been freed."); - } - else { - throw new IndexOutOfBoundsException(); - } - } - - public int compare(PureHybridMemorySegment seg2, int offset1, int offset2, int len) { - while (len >= 8) { - long l1 = this.getLongBigEndian(offset1); - long l2 = seg2.getLongBigEndian(offset2); - - if (l1 != l2) { - return (l1 < l2) ^ (l1 < 0) ^ (l2 < 0) ? -1 : 1; - } - - offset1 += 8; - offset2 += 8; - len -= 8; - } - while (len > 0) { - int b1 = this.get(offset1) & 0xff; - int b2 = seg2.get(offset2) & 0xff; - int cmp = b1 - b2; - if (cmp != 0) { - return cmp; - } - offset1++; - offset2++; - len--; - } - return 0; - } - - public void swapBytes(byte[] tempBuffer, PureHybridMemorySegment seg2, int offset1, int offset2, int len) { - if (len < 32) { - // fast path for short copies - while (len >= 8) { - long tmp = this.getLong(offset1); - this.putLong(offset1, seg2.getLong(offset2)); - seg2.putLong(offset2, tmp); - offset1 += 8; - offset2 += 8; - len -= 8; - } - while (len > 0) { - byte tmp = this.get(offset1); - this.put(offset1, seg2.get(offset2)); - seg2.put(offset2, tmp); - offset1++; - offset2++; - len--; - } - } - else if ( (offset1 | offset2 | len | (offset1 + len) | (offset2 + len) | - (this.size - (offset1 + len)) | (seg2.size() - (offset2 + len))) < 0 || len > tempBuffer.length) - { - throw new IndexOutOfBoundsException(); - } - else { - final long thisPos = this.address + offset1; - final long otherPos = seg2.address + offset2; - - if (thisPos <= this.addressLimit - len && otherPos <= seg2.addressLimit - len) { - final long arrayAddress = BYTE_ARRAY_BASE_OFFSET; - - // this -> temp buffer - UNSAFE.copyMemory(this.heapMemory, thisPos, tempBuffer, arrayAddress, len); - - // other -> this - UNSAFE.copyMemory(seg2.heapMemory, otherPos, this.heapMemory, thisPos, len); - - // temp buffer -> other - UNSAFE.copyMemory(tempBuffer, arrayAddress, seg2.heapMemory, otherPos, len); - } - else if (this.address <= 0 || seg2.address <= 0) { - throw new IllegalStateException("Memory segment has been freed."); - } - else { - // index is in fact invalid - throw new IndexOutOfBoundsException(); - } - } - } - - // -------------------------------------------------------------------------------------------- - // Utilities for native memory accesses and checks - // -------------------------------------------------------------------------------------------- - - @SuppressWarnings("restriction") - private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; - - @SuppressWarnings("restriction") - private static final long BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); - - private static final long COPY_PER_BATCH = 1024 * 1024; - - private static final Field ADDRESS_FIELD; - - static { - try { - ADDRESS_FIELD = java.nio.Buffer.class.getDeclaredField("address"); - ADDRESS_FIELD.setAccessible(true); - } - catch (Throwable t) { - throw new RuntimeException("Cannot initialize DirectMemorySegment - direct memory not supported by the JVM."); - } - } - - private static long getAddress(ByteBuffer buf) { - try { - return (Long) ADDRESS_FIELD.get(buf); - } - catch (Throwable t) { - throw new RuntimeException("Could not access direct byte buffer address.", t); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/75a52574/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHybridMemorySegmentOutView.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHybridMemorySegmentOutView.java b/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHybridMemorySegmentOutView.java deleted file mode 100644 index cda48e1..0000000 --- a/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHybridMemorySegmentOutView.java +++ /dev/null @@ -1,359 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.core.memory.benchmarks; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -import java.io.EOFException; -import java.io.IOException; -import java.io.UTFDataFormatException; -import java.util.List; - -public final class PureHybridMemorySegmentOutView implements DataOutputView { - - private PureHybridMemorySegment currentSegment; // the current memory segment to write to - - private int positionInSegment; // the offset in the current segment - - private final int segmentSize; // the size of the memory segments - - private final List<PureHybridMemorySegment> memorySource; - - private final List<PureHybridMemorySegment> fullSegments; - - - private byte[] utfBuffer; // the reusable array for UTF encodings - - - public PureHybridMemorySegmentOutView(List<PureHybridMemorySegment> emptySegments, - List<PureHybridMemorySegment> fullSegmentTarget, int segmentSize) { - this.segmentSize = segmentSize; - this.currentSegment = emptySegments.remove(emptySegments.size() - 1); - - this.memorySource = emptySegments; - this.fullSegments = fullSegmentTarget; - this.fullSegments.add(getCurrentSegment()); - } - - - public void reset() { - if (this.fullSegments.size() != 0) { - throw new IllegalStateException("The target list still contains memory segments."); - } - - clear(); - try { - advance(); - } - catch (IOException ioex) { - throw new RuntimeException("Error getting first segment for record collector.", ioex); - } - } - - // -------------------------------------------------------------------------------------------- - // Page Management - // -------------------------------------------------------------------------------------------- - - public PureHybridMemorySegment nextSegment(PureHybridMemorySegment current, int positionInCurrent) throws EOFException { - int size = this.memorySource.size(); - if (size > 0) { - final PureHybridMemorySegment next = this.memorySource.remove(size - 1); - this.fullSegments.add(next); - return next; - } else { - throw new EOFException(); - } - } - - public PureHybridMemorySegment getCurrentSegment() { - return this.currentSegment; - } - - public int getCurrentPositionInSegment() { - return this.positionInSegment; - } - - public int getSegmentSize() { - return this.segmentSize; - } - - protected void advance() throws IOException { - this.currentSegment = nextSegment(this.currentSegment, this.positionInSegment); - this.positionInSegment = 0; - } - - protected void seekOutput(PureHybridMemorySegment seg, int position) { - this.currentSegment = seg; - this.positionInSegment = position; - } - - protected void clear() { - this.currentSegment = null; - this.positionInSegment = 0; - } - - // -------------------------------------------------------------------------------------------- - // Data Output Specific methods - // -------------------------------------------------------------------------------------------- - - @Override - public void write(int b) throws IOException { - writeByte(b); - } - - @Override - public void write(byte[] b) throws IOException { - write(b, 0, b.length); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - int remaining = this.segmentSize - this.positionInSegment; - if (remaining >= len) { - this.currentSegment.put(this.positionInSegment, b, off, len); - this.positionInSegment += len; - } - else { - if (remaining == 0) { - advance(); - remaining = this.segmentSize - this.positionInSegment; - } - while (true) { - int toPut = Math.min(remaining, len); - this.currentSegment.put(this.positionInSegment, b, off, toPut); - off += toPut; - len -= toPut; - - if (len > 0) { - this.positionInSegment = this.segmentSize; - advance(); - remaining = this.segmentSize - this.positionInSegment; - } - else { - this.positionInSegment += toPut; - break; - } - } - } - } - - @Override - public void writeBoolean(boolean v) throws IOException { - writeByte(v ? 1 : 0); - } - - @Override - public void writeByte(int v) throws IOException { - if (this.positionInSegment < this.segmentSize) { - this.currentSegment.put(this.positionInSegment++, (byte) v); - } - else { - advance(); - writeByte(v); - } - } - - @Override - public void writeShort(int v) throws IOException { - if (this.positionInSegment < this.segmentSize - 1) { - this.currentSegment.putShortBigEndian(this.positionInSegment, (short) v); - this.positionInSegment += 2; - } - else if (this.positionInSegment == this.segmentSize) { - advance(); - writeShort(v); - } - else { - writeByte(v >> 8); - writeByte(v); - } - } - - @Override - public void writeChar(int v) throws IOException { - if (this.positionInSegment < this.segmentSize - 1) { - this.currentSegment.putCharBigEndian(this.positionInSegment, (char) v); - this.positionInSegment += 2; - } - else if (this.positionInSegment == this.segmentSize) { - advance(); - writeChar(v); - } - else { - writeByte(v >> 8); - writeByte(v); - } - } - - @Override - public void writeInt(int v) throws IOException { - if (this.positionInSegment < this.segmentSize - 3) { - this.currentSegment.putIntBigEndian(this.positionInSegment, v); - this.positionInSegment += 4; - } - else if (this.positionInSegment == this.segmentSize) { - advance(); - writeInt(v); - } - else { - writeByte(v >> 24); - writeByte(v >> 16); - writeByte(v >> 8); - writeByte(v); - } - } - - @Override - public void writeLong(long v) throws IOException { - if (this.positionInSegment < this.segmentSize - 7) { - this.currentSegment.putLongBigEndian(this.positionInSegment, v); - this.positionInSegment += 8; - } - else if (this.positionInSegment == this.segmentSize) { - advance(); - writeLong(v); - } - else { - writeByte((int) (v >> 56)); - writeByte((int) (v >> 48)); - writeByte((int) (v >> 40)); - writeByte((int) (v >> 32)); - writeByte((int) (v >> 24)); - writeByte((int) (v >> 16)); - writeByte((int) (v >> 8)); - writeByte((int) v); - } - } - - @Override - public void writeFloat(float v) throws IOException { - writeInt(Float.floatToRawIntBits(v)); - } - - @Override - public void writeDouble(double v) throws IOException { - writeLong(Double.doubleToRawLongBits(v)); - } - - @Override - public void writeBytes(String s) throws IOException { - for (int i = 0; i < s.length(); i++) { - writeByte(s.charAt(i)); - } - } - - @Override - public void writeChars(String s) throws IOException { - for (int i = 0; i < s.length(); i++) { - writeChar(s.charAt(i)); - } - } - - @Override - public void writeUTF(String str) throws IOException { - int strlen = str.length(); - int utflen = 0; - int c, count = 0; - - /* use charAt instead of copying String to char array */ - for (int i = 0; i < strlen; i++) { - c = str.charAt(i); - if ((c >= 0x0001) && (c <= 0x007F)) { - utflen++; - } else if (c > 0x07FF) { - utflen += 3; - } else { - utflen += 2; - } - } - - if (utflen > 65535) { - throw new UTFDataFormatException("encoded string too long: " + utflen + " memory"); - } - - if (this.utfBuffer == null || this.utfBuffer.length < utflen + 2) { - this.utfBuffer = new byte[utflen + 2]; - } - final byte[] bytearr = this.utfBuffer; - - bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF); - bytearr[count++] = (byte) (utflen & 0xFF); - - int i = 0; - for (i = 0; i < strlen; i++) { - c = str.charAt(i); - if (!((c >= 0x0001) && (c <= 0x007F))) { - break; - } - bytearr[count++] = (byte) c; - } - - for (; i < strlen; i++) { - c = str.charAt(i); - if ((c >= 0x0001) && (c <= 0x007F)) { - bytearr[count++] = (byte) c; - - } else if (c > 0x07FF) { - bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); - bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F)); - bytearr[count++] = (byte) (0x80 | (c & 0x3F)); - } else { - bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); - bytearr[count++] = (byte) (0x80 | (c & 0x3F)); - } - } - - write(bytearr, 0, utflen + 2); - } - - @Override - public void skipBytesToWrite(int numBytes) throws IOException { - while (numBytes > 0) { - final int remaining = this.segmentSize - this.positionInSegment; - if (numBytes <= remaining) { - this.positionInSegment += numBytes; - return; - } - this.positionInSegment = this.segmentSize; - advance(); - numBytes -= remaining; - } - } - - @Override - public void write(DataInputView source, int numBytes) throws IOException { - while (numBytes > 0) { - final int remaining = this.segmentSize - this.positionInSegment; - if (numBytes <= remaining) { - this.currentSegment.put(source, this.positionInSegment, numBytes); - this.positionInSegment += numBytes; - return; - } - - if (remaining > 0) { - this.currentSegment.put(source, this.positionInSegment, remaining); - this.positionInSegment = this.segmentSize; - numBytes -= remaining; - } - - advance(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/75a52574/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureOffHeapMemorySegment.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureOffHeapMemorySegment.java b/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureOffHeapMemorySegment.java deleted file mode 100644 index 1280242..0000000 --- a/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureOffHeapMemorySegment.java +++ /dev/null @@ -1,790 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.core.memory.benchmarks; - -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.core.memory.MemoryUtils; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.lang.reflect.Field; -import java.nio.BufferOverflowException; -import java.nio.BufferUnderflowException; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - -public final class PureOffHeapMemorySegment { - - /** Constant that flags the byte order. Because this is a boolean constant, - * the JIT compiler can use this well to aggressively eliminate the non-applicable code paths */ - private static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN); - - /** The direct byte buffer that allocated the memory */ - private ByteBuffer buffer; - - /** The address to the off-heap data */ - private long address; - - /** The address one byte after the last addressable byte. - * This is address + size while the segment is not disposed */ - private final long addressLimit; - - /** The size in bytes of the memory segment */ - private final int size; - - // ------------------------------------------------------------------------- - // Constructors - // ------------------------------------------------------------------------- - - /** - * Creates a new memory segment that represents the memory backing the given direct byte buffer. - * Note that the given ByteBuffer must be direct {@link java.nio.ByteBuffer#allocateDirect(int)}, - * otherwise this method with throw an IllegalArgumentException. data in the given byte array. - * - * @param buffer The byte buffer whose memory is represented by this memory segment. - * @throws IllegalArgumentException Thrown, if the given ByteBuffer is not direct. - */ - public PureOffHeapMemorySegment(ByteBuffer buffer) { - if (buffer == null || !buffer.isDirect()) { - throw new IllegalArgumentException("Can't initialize from non-direct ByteBuffer."); - } - - this.buffer = buffer; - this.size = buffer.capacity(); - this.address = getAddress(buffer); - this.addressLimit = this.address + size; - - if (address >= Long.MAX_VALUE - Integer.MAX_VALUE) { - throw new RuntimeException("Segment initialized with too large address: " + address); - } - } - - // ------------------------------------------------------------------------- - // Direct Memory Segment Specifics - // ------------------------------------------------------------------------- - - /** - * Gets the buffer that owns the memory of this memory segment. - * - * @return The byte buffer that owns the memory of this memory segment. - */ - public ByteBuffer getBuffer() { - return this.buffer; - } - - /** - * Gets the memory address of the memory backing this memory segment. - * - * @return The memory start address of the memory backing this memory segment. - */ - public long getAddress() { - return address; - } - - // ------------------------------------------------------------------------- - // MemorySegment Accessors - // ------------------------------------------------------------------------- - - public final boolean isFreed() { - return this.address > this.addressLimit; - } - - public final void free() { - // this ensures we can place no more data and trigger - // the checks for the freed segment - this.address = this.addressLimit + 1; - this.buffer = null; - } - - public final int size() { - return this.size; - } - - public ByteBuffer wrap(int offset, int length) { - if (offset < 0 || offset > this.size || offset > this.size - length) { - throw new IndexOutOfBoundsException(); - } - - this.buffer.limit(offset + length); - this.buffer.position(offset); - - return this.buffer; - } - - - // ------------------------------------------------------------------------ - // Random Access get() and put() methods - // ------------------------------------------------------------------------ - - @SuppressWarnings("restriction") - public final byte get(int index) { - - final long pos = address + index; - if (index >= 0 && pos < addressLimit) { - return UNSAFE.getByte(pos); - } - else if (address > addressLimit) { - throw new IllegalStateException("This segment has been freed."); - } - else { - // index is in fact invalid - throw new IndexOutOfBoundsException(); - } - } - - @SuppressWarnings("restriction") - public final void put(int index, byte b) { - - final long pos = address + index; - if (index >= 0 && pos < addressLimit) { - UNSAFE.putByte(pos, b); - } - else if (address > addressLimit) { - throw new IllegalStateException("This segment has been freed."); - } - else { - // index is in fact invalid - throw new IndexOutOfBoundsException(); - } - } - - public final void get(int index, byte[] dst) { - get(index, dst, 0, dst.length); - } - - public final void put(int index, byte[] src) { - put(index, src, 0, src.length); - } - - @SuppressWarnings("restriction") - public final void get(int index, byte[] dst, int offset, int length) { - - // check the byte array offset and length - if ((offset | length | (offset + length) | (dst.length - (offset + length))) < 0) { - throw new IndexOutOfBoundsException(); - } - - long pos = address + index; - - if (index >= 0 && pos <= addressLimit - length) { - long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset; - - // the copy must proceed in batches not too large, because the JVM may - // poll for points that are safe for GC (moving the array and changing its address) - while (length > 0) { - long toCopy = (length > COPY_PER_BATCH) ? COPY_PER_BATCH : length; - UNSAFE.copyMemory(null, pos, dst, arrayAddress, toCopy); - length -= toCopy; - pos += toCopy; - arrayAddress += toCopy; - } - } - else if (address <= 0) { - throw new IllegalStateException("This segment has been freed."); - } - else { - // index is in fact invalid - throw new IndexOutOfBoundsException(); - } - } - - @SuppressWarnings("restriction") - public final void put(int index, byte[] src, int offset, int length) { - // check the byte array offset and length - if ((offset | length | (offset + length) | (src.length - (offset + length))) < 0) { - throw new IndexOutOfBoundsException(); - } - - long pos = address + index; - - if (index >= 0 && pos <= addressLimit - length) { - - long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset; - while (length > 0) { - long toCopy = (length > COPY_PER_BATCH) ? COPY_PER_BATCH : length; - UNSAFE.copyMemory(src, arrayAddress, null, pos, toCopy); - length -= toCopy; - pos += toCopy; - arrayAddress += toCopy; - } - } - else if (address <= 0) { - throw new IllegalStateException("This segment has been freed."); - } - else { - // index is in fact invalid - throw new IndexOutOfBoundsException(); - } - } - - public final boolean getBoolean(int index) { - return get(index) != 0; - } - - public final void putBoolean(int index, boolean value) { - put(index, (byte) (value ? 1 : 0)); - } - - @SuppressWarnings("restriction") - public final char getChar(int index) { - final long pos = address + index; - if (index >= 0 && pos <= addressLimit - 2) { - return UNSAFE.getChar(pos); - } - else if (address > addressLimit) { - throw new IllegalStateException("This segment has been freed."); - } - else { - // index is in fact invalid - throw new IndexOutOfBoundsException(); - } - } - - public final char getCharLittleEndian(int index) { - if (LITTLE_ENDIAN) { - return getChar(index); - } else { - return Character.reverseBytes(getChar(index)); - } - } - - public final char getCharBigEndian(int index) { - if (LITTLE_ENDIAN) { - return Character.reverseBytes(getChar(index)); - } else { - return getChar(index); - } - } - - @SuppressWarnings("restriction") - public final void putChar(int index, char value) { - final long pos = address + index; - if (index >= 0 && pos <= addressLimit - 2) { - UNSAFE.putChar(pos, value); - } - else if (address > addressLimit) { - throw new IllegalStateException("This segment has been freed."); - } - else { - // index is in fact invalid - throw new IndexOutOfBoundsException(); - } - } - - public final void putCharLittleEndian(int index, char value) { - if (LITTLE_ENDIAN) { - putChar(index, value); - } else { - putChar(index, Character.reverseBytes(value)); - } - } - - public final void putCharBigEndian(int index, char value) { - if (LITTLE_ENDIAN) { - putChar(index, Character.reverseBytes(value)); - } else { - putChar(index, value); - } - } - - @SuppressWarnings("restriction") - public final short getShort(int index) { - final long pos = address + index; - if (index >= 0 && pos <= addressLimit - 2) { - return UNSAFE.getShort(pos); - } - else if (address > addressLimit) { - throw new IllegalStateException("This segment has been freed."); - } - else { - // index is in fact invalid - throw new IndexOutOfBoundsException(); - } - } - - public final short getShortLittleEndian(int index) { - if (LITTLE_ENDIAN) { - return getShort(index); - } else { - return Short.reverseBytes(getShort(index)); - } - } - - public final short getShortBigEndian(int index) { - if (LITTLE_ENDIAN) { - return Short.reverseBytes(getShort(index)); - } else { - return getShort(index); - } - } - - @SuppressWarnings("restriction") - public final void putShort(int index, short value) { - final long pos = address + index; - if (index >= 0 && pos <= addressLimit - 2) { - UNSAFE.putShort(pos, value); - } - else if (address > addressLimit) { - throw new IllegalStateException("This segment has been freed."); - } - else { - // index is in fact invalid - throw new IndexOutOfBoundsException(); - } - } - - public final void putShortLittleEndian(int index, short value) { - if (LITTLE_ENDIAN) { - putShort(index, value); - } else { - putShort(index, Short.reverseBytes(value)); - } - } - - public final void putShortBigEndian(int index, short value) { - if (LITTLE_ENDIAN) { - putShort(index, Short.reverseBytes(value)); - } else { - putShort(index, value); - } - } - - @SuppressWarnings("restriction") - public final int getInt(int index) { - final long pos = address + index; - if (index >= 0 && pos <= addressLimit - 4) { - return UNSAFE.getInt(pos); - } - else if (address > addressLimit) { - throw new IllegalStateException("This segment has been freed."); - } - else { - // index is in fact invalid - throw new IndexOutOfBoundsException(); - } - } - - public final int getIntLittleEndian(int index) { - if (LITTLE_ENDIAN) { - return getInt(index); - } else { - return Integer.reverseBytes(getInt(index)); - } - } - - public final int getIntBigEndian(int index) { - if (LITTLE_ENDIAN) { - return Integer.reverseBytes(getInt(index)); - } else { - return getInt(index); - } - } - - @SuppressWarnings("restriction") - public final void putInt(int index, int value) { - final long pos = address + index; - if (index >= 0 && pos <= addressLimit - 4) { - UNSAFE.putInt(pos, value); - } - else if (address > addressLimit) { - throw new IllegalStateException("This segment has been freed."); - } - else { - // index is in fact invalid - throw new IndexOutOfBoundsException(); - } - } - - public final void putIntLittleEndian(int index, int value) { - if (LITTLE_ENDIAN) { - putInt(index, value); - } else { - putInt(index, Integer.reverseBytes(value)); - } - } - - public final void putIntBigEndian(int index, int value) { - if (LITTLE_ENDIAN) { - putInt(index, Integer.reverseBytes(value)); - } else { - putInt(index, value); - } - } - - @SuppressWarnings("restriction") - public final long getLong(int index) { - final long pos = address + index; - if (index >= 0 && pos <= addressLimit - 8) { - return UNSAFE.getLong(pos); - } - else if (address > addressLimit) { - throw new IllegalStateException("This segment has been freed."); - } - else { - // index is in fact invalid - throw new IndexOutOfBoundsException(); - } - } - - public final long getLongLittleEndian(int index) { - if (LITTLE_ENDIAN) { - return getLong(index); - } else { - return Long.reverseBytes(getLong(index)); - } - } - - public final long getLongBigEndian(int index) { - if (LITTLE_ENDIAN) { - return Long.reverseBytes(getLong(index)); - } else { - return getLong(index); - } - } - - @SuppressWarnings("restriction") - public final void putLong(int index, long value) { - final long pos = address + index; - if (index >= 0 && pos <= addressLimit - 8) { - UNSAFE.putLong(pos, value); - } - else if (address > addressLimit) { - throw new IllegalStateException("This segment has been freed."); - } - else { - // index is in fact invalid - throw new IndexOutOfBoundsException(); - } - } - - public final void putLongLittleEndian(int index, long value) { - if (LITTLE_ENDIAN) { - putLong(index, value); - } else { - putLong(index, Long.reverseBytes(value)); - } - } - - public final void putLongBigEndian(int index, long value) { - if (LITTLE_ENDIAN) { - putLong(index, Long.reverseBytes(value)); - } else { - putLong(index, value); - } - } - - public final float getFloat(int index) { - return Float.intBitsToFloat(getInt(index)); - } - - public final float getFloatLittleEndian(int index) { - return Float.intBitsToFloat(getIntLittleEndian(index)); - } - - public final float getFloatBigEndian(int index) { - return Float.intBitsToFloat(getIntBigEndian(index)); - } - - public final void putFloat(int index, float value) { - putInt(index, Float.floatToRawIntBits(value)); - } - - public final void putFloatLittleEndian(int index, float value) { - putIntLittleEndian(index, Float.floatToRawIntBits(value)); - } - - public final void putFloatBigEndian(int index, float value) { - putIntBigEndian(index, Float.floatToRawIntBits(value)); - } - - public final double getDouble(int index) { - return Double.longBitsToDouble(getLong(index)); - } - - public final double getDoubleLittleEndian(int index) { - return Double.longBitsToDouble(getLongLittleEndian(index)); - } - - public final double getDoubleBigEndian(int index) { - return Double.longBitsToDouble(getLongBigEndian(index)); - } - - public final void putDouble(int index, double value) { - putLong(index, Double.doubleToRawLongBits(value)); - } - - public final void putDoubleLittleEndian(int index, double value) { - putLongLittleEndian(index, Double.doubleToRawLongBits(value)); - } - - public final void putDoubleBigEndian(int index, double value) { - putLongBigEndian(index, Double.doubleToRawLongBits(value)); - } - - // ------------------------------------------------------------------------- - // Bulk Read and Write Methods - // ------------------------------------------------------------------------- - - public final void get(DataOutput out, int offset, int length) throws IOException { - while (length >= 8) { - out.writeLong(getLongBigEndian(offset)); - offset += 8; - length -= 8; - } - - while(length > 0) { - out.writeByte(get(offset)); - offset++; - length--; - } - } - - public final void put(DataInput in, int offset, int length) throws IOException { - while (length >= 8) { - putLongBigEndian(offset, in.readLong()); - offset += 8; - length -= 8; - } - while(length > 0) { - put(offset, in.readByte()); - offset++; - length--; - } - } - - @SuppressWarnings("restriction") - public final void get(int offset, ByteBuffer target, int numBytes) { - - // check the byte array offset and length - if ((offset | numBytes | (offset + numBytes) | (size - (offset + numBytes))) < 0) { - throw new IndexOutOfBoundsException(); - } - - final int targetOffset = target.position(); - final int remaining = target.remaining(); - - if (remaining < numBytes) { - throw new BufferOverflowException(); - } - - if (target.isDirect()) { - // copy to the target memory directly - final long targetPointer = getAddress(target) + targetOffset; - final long sourcePointer = address + offset; - - if (sourcePointer <= addressLimit - numBytes) { - UNSAFE.copyMemory(sourcePointer, targetPointer, numBytes); - } - else if (address > addressLimit) { - throw new IllegalStateException("This segment has been freed."); - } - else { - throw new IndexOutOfBoundsException(); - } - } - else if (target.hasArray()) { - // move directly into the byte array - get(offset, target.array(), targetOffset + target.arrayOffset(), numBytes); - - // this must be after the get() call to ensue that the byte buffer is not - // modified in case the call fails - target.position(targetOffset + numBytes); - } - else { - // neither heap buffer nor direct buffer - while (target.hasRemaining()) { - target.put(get(offset++)); - } - } - } - - @SuppressWarnings("restriction") - public final void put(int offset, ByteBuffer source, int numBytes) { - - // check the byte array offset and length - if ((offset | numBytes | (offset + numBytes) | (size - (offset + numBytes))) < 0) { - throw new IndexOutOfBoundsException(); - } - - final int sourceOffset = source.position(); - final int remaining = source.remaining(); - - if (remaining < numBytes) { - throw new BufferUnderflowException(); - } - - if (source.isDirect()) { - // copy to the target memory directly - final long sourcePointer = getAddress(source) + sourceOffset; - final long targetPointer = address + offset; - - if (sourcePointer <= addressLimit - numBytes) { - UNSAFE.copyMemory(sourcePointer, targetPointer, numBytes); - } - else if (address > addressLimit) { - throw new IllegalStateException("This segment has been freed."); - } - else { - throw new IndexOutOfBoundsException(); - } - } - else if (source.hasArray()) { - // move directly into the byte array - put(offset, source.array(), sourceOffset + source.arrayOffset(), numBytes); - - // this must be after the get() call to ensue that the byte buffer is not - // modified in case the call fails - source.position(sourceOffset + numBytes); - } - else { - // neither heap buffer nor direct buffer - while (source.hasRemaining()) { - put(offset++, source.get()); - } - } - } - - @SuppressWarnings("restriction") - public final void copyTo(int offset, PureOffHeapMemorySegment target, int targetOffset, int numBytes) { - final long thisPointer = address + offset; - final long otherPointer = target.address + targetOffset; - - if (numBytes >= 0 && thisPointer <= addressLimit - numBytes && otherPointer <= target.addressLimit - numBytes) { - UNSAFE.copyMemory(thisPointer, otherPointer, numBytes); - } - else if (address > addressLimit || target.address > target.addressLimit) { - throw new IllegalStateException("This segment has been freed."); - } - else { - throw new IndexOutOfBoundsException(); - } - } - - public int compare(MemorySegment seg2, int offset1, int offset2, int len) { - while (len >= 8) { - long l1 = this.getLongBigEndian(offset1); - long l2 = seg2.getLongBigEndian(offset2); - - if (l1 != l2) { - return (l1 < l2) ^ (l1 < 0) ^ (l2 < 0) ? -1 : 1; - } - - offset1 += 8; - offset2 += 8; - len -= 8; - } - while (len > 0) { - int b1 = this.get(offset1) & 0xff; - int b2 = seg2.get(offset2) & 0xff; - int cmp = b1 - b2; - if (cmp != 0) { - return cmp; - } - offset1++; - offset2++; - len--; - } - return 0; - } - - public void swapBytes(byte[] tempBuffer, PureOffHeapMemorySegment seg2, int offset1, int offset2, int len) { - if (len < 32) { - // fast path for short copies - while (len >= 8) { - long tmp = this.getLong(offset1); - this.putLong(offset1, seg2.getLong(offset2)); - seg2.putLong(offset2, tmp); - offset1 += 8; - offset2 += 8; - len -= 8; - } - while (len > 0) { - byte tmp = this.get(offset1); - this.put(offset1, seg2.get(offset2)); - seg2.put(offset2, tmp); - offset1++; - offset2++; - len--; - } - } - else if ( (offset1 | offset2 | len | (offset1 + len) | (offset2 + len) | - (this.size - (offset1 + len)) | (seg2.size() - (offset2 + len))) < 0 || len > tempBuffer.length) - { - throw new IndexOutOfBoundsException(); - } - else { - final long thisPos = this.address + offset1; - final long otherPos = seg2.address + offset2; - - if (thisPos <= this.addressLimit - len && otherPos <= seg2.addressLimit - len) { - final long arrayAddress = BYTE_ARRAY_BASE_OFFSET; - - // this -> temp buffer - UNSAFE.copyMemory(null, thisPos, tempBuffer, arrayAddress, len); - - // other -> this - UNSAFE.copyMemory(null, otherPos, null, thisPos, len); - - // temp buffer -> other - UNSAFE.copyMemory(tempBuffer, arrayAddress, null, otherPos, len); - } - else if (this.address <= 0 || seg2.address <= 0) { - throw new IllegalStateException("Memory segment has been freed."); - } - else { - // index is in fact invalid - throw new IndexOutOfBoundsException(); - } - } - } - - // -------------------------------------------------------------------------------------------- - // Utilities for native memory accesses and checks - // -------------------------------------------------------------------------------------------- - - @SuppressWarnings("restriction") - private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; - - @SuppressWarnings("restriction") - private static final long BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); - - private static final long COPY_PER_BATCH = 1024 * 1024; - - private static final Field ADDRESS_FIELD; - - static { - try { - ADDRESS_FIELD = java.nio.Buffer.class.getDeclaredField("address"); - ADDRESS_FIELD.setAccessible(true); - } - catch (Throwable t) { - throw new RuntimeException("Cannot initialize DirectMemorySegment - direct memory not supported by the JVM."); - } - } - - private static long getAddress(ByteBuffer buf) { - try { - return (Long) ADDRESS_FIELD.get(buf); - } - catch (Throwable t) { - throw new RuntimeException("Could not access direct byte buffer address.", t); - } - } -}
