IGNITE-1917: Added methods for unsafe writes.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1b48b585 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1b48b585 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1b48b585 Branch: refs/heads/ignite-1917 Commit: 1b48b58571bb745d7ce95d7395d85ad553206f58 Parents: 3f19b51 Author: vozerov-gridgain <[email protected]> Authored: Mon Nov 16 15:09:55 2015 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Mon Nov 16 15:09:55 2015 +0300 ---------------------------------------------------------------------- .../streams/PortableAbstractOutputStream.java | 47 ++++++---- .../streams/PortableHeapOutputStream.java | 35 ++++++- .../streams/PortableOffheapOutputStream.java | 39 +++++++- .../portable/streams/PortableOutputStream.java | 99 ++++++++++++++++++++ .../PlatformBigEndianOutputStreamImpl.java | 20 ++++ .../memory/PlatformOutputStreamImpl.java | 61 ++++++++++++ 6 files changed, 277 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1b48b585/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java index c943682..8feaf35 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java @@ -123,14 +123,14 @@ public abstract class PortableAbstractOutputStream extends PortableAbstractStrea @Override public void writeShort(int pos, short val) { ensureCapacity(pos + 2); - writeShortPositioned(pos, val); + unsafeWriteShort(pos, val); } /** {@inheritDoc} */ @Override public void writeInt(int pos, int val) { ensureCapacity(pos + 4); - writeIntPositioned(pos, val); + unsafeWriteInt(pos, val); } /** {@inheritDoc} */ @@ -247,6 +247,33 @@ public abstract class PortableAbstractOutputStream extends PortableAbstractStrea return 0; } + /** {@inheritDoc} */ + @Override public int unsafeStart(int cap) { + ensureCapacity(pos + cap); + + return pos; + } + + /** {@inheritDoc} */ + @Override public void unsafeStop(int pos) { + position(pos); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteBoolean(int pos, boolean val) { + unsafeWriteByte(pos, val ? BYTE_ONE : BYTE_ZERO); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteFloat(int pos, float val) { + unsafeWriteInt(pos, Float.floatToIntBits(val)); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteDouble(int pos, double val) { + unsafeWriteLong(pos, Double.doubleToLongBits(val)); + } + /** * Calculate new capacity. * @@ -314,22 +341,6 @@ public abstract class PortableAbstractOutputStream extends PortableAbstractStrea protected abstract void writeLongFast(long val); /** - * Write short value to the given position. - * - * @param pos Position. - * @param val Value. - */ - protected abstract void writeShortPositioned(int pos, short val); - - /** - * Write int value to the given position. - * - * @param pos Position. - * @param val Value. - */ - protected abstract void writeIntPositioned(int pos, int val); - - /** * Ensure capacity. * * @param cnt Required byte count. http://git-wip-us.apache.org/repos/asf/ignite/blob/1b48b585/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java index 51d6c85..f45a637 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java @@ -114,7 +114,12 @@ public final class PortableHeapOutputStream extends PortableAbstractOutputStream } /** {@inheritDoc} */ - @Override protected void writeShortPositioned(int pos, short val) { + @Override public void unsafeWriteByte(int pos, byte val) { + UNSAFE.putByte(data, BYTE_ARR_OFF + pos, val); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteShort(int pos, short val) { if (!LITTLE_ENDIAN) val = Short.reverseBytes(val); @@ -122,10 +127,36 @@ public final class PortableHeapOutputStream extends PortableAbstractOutputStream } /** {@inheritDoc} */ - @Override protected void writeIntPositioned(int pos, int val) { + @Override public void unsafeWriteChar(int pos, char val) { + if (!LITTLE_ENDIAN) + val = Character.reverseBytes(val); + + UNSAFE.putChar(data, BYTE_ARR_OFF + pos, val); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteInt(int pos, int val) { if (!LITTLE_ENDIAN) val = Integer.reverseBytes(val); UNSAFE.putInt(data, BYTE_ARR_OFF + pos, val); } + + /** {@inheritDoc} */ + @Override public void unsafeWriteLong(int pos, long val) { + if (!LITTLE_ENDIAN) + val = Long.reverseBytes(val); + + UNSAFE.putLong(data, BYTE_ARR_OFF + pos, val); + } + + /** {@inheritDoc} */ + @Override public void unsafeWrite(int pos, byte[] arr, int off, int cnt) { + UNSAFE.copyMemory(arr, BYTE_ARR_OFF + off, data, BYTE_ARR_OFF + pos, cnt); + } + + /** {@inheritDoc} */ + @Override public void unsafeWrite(int pos, long addr, int cnt) { + UNSAFE.copyMemory(null, addr, data, BYTE_ARR_OFF + pos, cnt); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/1b48b585/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java index 430a176..59acfcd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java @@ -125,7 +125,17 @@ public class PortableOffheapOutputStream extends PortableAbstractOutputStream { } /** {@inheritDoc} */ - @Override protected void writeShortPositioned(int pos, short val) { + @Override public boolean hasArray() { + return false; + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteByte(int pos, byte val) { + UNSAFE.putByte(ptr + pos, val); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteShort(int pos, short val) { if (!LITTLE_ENDIAN) val = Short.reverseBytes(val); @@ -133,7 +143,15 @@ public class PortableOffheapOutputStream extends PortableAbstractOutputStream { } /** {@inheritDoc} */ - @Override protected void writeIntPositioned(int pos, int val) { + @Override public void unsafeWriteChar(int pos, char val) { + if (!LITTLE_ENDIAN) + val = Character.reverseBytes(val); + + UNSAFE.putChar(ptr + pos, val); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteInt(int pos, int val) { if (!LITTLE_ENDIAN) val = Integer.reverseBytes(val); @@ -141,8 +159,21 @@ public class PortableOffheapOutputStream extends PortableAbstractOutputStream { } /** {@inheritDoc} */ - @Override public boolean hasArray() { - return false; + @Override public void unsafeWriteLong(int pos, long val) { + if (!LITTLE_ENDIAN) + val = Long.reverseBytes(val); + + UNSAFE.putLong(ptr + pos, val); + } + + /** {@inheritDoc} */ + @Override public void unsafeWrite(int pos, byte[] arr, int off, int cnt) { + UNSAFE.copyMemory(arr, BYTE_ARR_OFF + off, null, ptr + pos, cnt); + } + + /** {@inheritDoc} */ + @Override public void unsafeWrite(int pos, long addr, int cnt) { + UNSAFE.copyMemory(null, addr, null, ptr + pos, cnt); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/1b48b585/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java index 0e25b12..ccccfa6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java @@ -170,4 +170,103 @@ public interface PortableOutputStream extends PortableStream, AutoCloseable { * Close the stream releasing resources. */ @Override public void close(); + + /** + * Start write in unsafe mode. Ensures there is enough data to write at least the given amount of bytes. + * + * @param cap Capacity. + * @return Start position. + */ + public int unsafeStart(int cap); + + /** + * Finish write in unsafe mode. Essentially it just shifts position to the given location. But this method is + * more convenient because allows for easy error detection when paired with {@link #unsafeStart(int)}. + * + * @param pos Final absolute position. + */ + public void unsafeStop(int pos); + + /** + * Write byte in unsafe mode. + * + * @param pos Position. + * @param val Value. + */ + public void unsafeWriteByte(int pos, byte val); + + /** + * Write boolean in unsafe mode. + * + * @param pos Position. + * @param val Value. + */ + public void unsafeWriteBoolean(int pos, boolean val); + + /** + * Write short in unsafe mode. + * + * @param pos Position. + * @param val Value. + */ + public void unsafeWriteShort(int pos, short val); + + /** + * Write char in unsafe mode. + * + * @param pos Position. + * @param val Value. + */ + public void unsafeWriteChar(int pos, char val); + + /** + * Write int in unsafe mode. + * + * @param pos Position. + * @param val Value. + */ + public void unsafeWriteInt(int pos, int val); + + /** + * Write long in unsafe mode. + * + * @param pos Position. + * @param val Value. + */ + public void unsafeWriteLong(int pos, long val); + + /** + * Write float in unsafe mode. + * + * @param pos Position. + * @param val Value. + */ + public void unsafeWriteFloat(int pos, float val); + + /** + * Write double in unsafe mode. + * + * @param pos Position. + * @param val Value. + */ + public void unsafeWriteDouble(int pos, double val); + + /** + * Write array in unsafe mode. + * + * @param pos Position. + * @param arr Value. + * @param off Offset. + * @param cnt Amount of bytes to write. + */ + public void unsafeWrite(int pos, byte[] arr, int off, int cnt); + + /** + * Write raw memory in unsafe mode. + * + * @param pos Position. + * @param addr Address. + * @param cnt Amount of bytes to write. + */ + public void unsafeWrite(int pos, long addr, int cnt); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/1b48b585/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java index e5fd71b..3f8e85a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java @@ -163,4 +163,24 @@ public class PlatformBigEndianOutputStreamImpl extends PlatformOutputStreamImpl shift(cnt); } + + /** {@inheritDoc} */ + @Override public void unsafeWriteShort(int pos, short val) { + UNSAFE.putShort(data + pos, Short.reverseBytes(val)); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteChar(int pos, char val) { + UNSAFE.putChar(data + pos, Character.reverseBytes(val)); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteInt(int pos, int val) { + UNSAFE.putInt(data + pos, Integer.reverseBytes(val)); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteLong(int pos, long val) { + UNSAFE.putLong(data + pos, val, Long.reverseBytes(val)); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1b48b585/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStreamImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStreamImpl.java index 16b1567..70e1dd9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStreamImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStreamImpl.java @@ -223,6 +223,67 @@ public class PlatformOutputStreamImpl implements PlatformOutputStream { } /** {@inheritDoc} */ + @Override public int unsafeStart(int cap) { + ensureCapacity(pos + cap); + + return pos; + } + + /** {@inheritDoc} */ + @Override public void unsafeStop(int pos) { + position(pos); + } + + @Override public void unsafeWriteByte(int pos, byte val) { + UNSAFE.putByte(data + pos, val); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteBoolean(int pos, boolean val) { + unsafeWriteByte(pos, val ? (byte)1 : (byte)0); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteShort(int pos, short val) { + UNSAFE.putShort(data + pos, val); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteChar(int pos, char val) { + UNSAFE.putChar(data + pos, val); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteInt(int pos, int val) { + UNSAFE.putInt(data + pos, val); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteLong(int pos, long val) { + UNSAFE.putLong(data + pos, val); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteFloat(int pos, float val) { + unsafeWriteInt(pos, Float.floatToIntBits(val)); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteDouble(int pos, double val) { + unsafeWriteLong(pos, Double.doubleToLongBits(val)); + } + + /** {@inheritDoc} */ + @Override public void unsafeWrite(int pos, byte[] arr, int off, int cnt) { + UNSAFE.copyMemory(arr, BYTE_ARR_OFF + off, null, data + pos, cnt); + } + + /** {@inheritDoc} */ + @Override public void unsafeWrite(int pos, long addr, int cnt) { + UNSAFE.copyMemory(null, addr, null, data + pos, cnt); + } + + /** {@inheritDoc} */ @Override public void synchronize() { PlatformMemoryUtils.length(mem.pointer(), pos); }
