Repository: ignite Updated Branches: refs/heads/ignite-direct-marsh-opt f33e64309 -> 393b630ef
Optimizations for direct marshalling Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/393b630e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/393b630e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/393b630e Branch: refs/heads/ignite-direct-marsh-opt Commit: 393b630ef976bf59844dfb00a2c9aa2e29ad7d6f Parents: f33e643 Author: Valentin Kulichenko <valentin.kuliche...@gmail.com> Authored: Mon Nov 16 22:04:57 2015 -0800 Committer: Valentin Kulichenko <valentin.kuliche...@gmail.com> Committed: Mon Nov 16 22:04:57 2015 -0800 ---------------------------------------------------------------------- .../internal/direct/DirectByteBufferStream.java | 307 ++++++++++++++----- .../internal/direct/DirectMessageWriter.java | 9 +- 2 files changed, 234 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/393b630e/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java index ccbfe09..e05e20d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java @@ -23,8 +23,9 @@ import java.util.ArrayList; import java.util.BitSet; import java.util.Collection; import java.util.Iterator; +import java.util.List; import java.util.Map; -import java.util.NoSuchElementException; +import java.util.RandomAccess; import java.util.UUID; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.typedef.internal.U; @@ -256,7 +257,7 @@ public class DirectByteBufferStream { private Iterator<?> it; /** */ - private Iterator<?> arrIt; + private int arrPos = -1; /** */ private Object arrCur = NULL; @@ -292,6 +293,18 @@ public class DirectByteBufferStream { private int primShift; /** */ + private int uuidState; + + /** */ + private long uuidMost; + + /** */ + private long uuidLeast; + + /** */ + private long uuidLocId; + + /** */ private boolean lastFinished; /** */ @@ -503,7 +516,7 @@ public class DirectByteBufferStream { } /** - * @param val Value + * @param val Value. */ public void writeShortArray(short[] val) { if (val != null) @@ -513,7 +526,7 @@ public class DirectByteBufferStream { } /** - * @param val Value + * @param val Value. */ public void writeIntArray(int[] val) { if (val != null) @@ -523,7 +536,7 @@ public class DirectByteBufferStream { } /** - * @param val Value + * @param val Value. */ public void writeLongArray(long[] val) { if (val != null) @@ -533,7 +546,7 @@ public class DirectByteBufferStream { } /** - * @param val Value + * @param val Value. */ public void writeFloatArray(float[] val) { if (val != null) @@ -543,7 +556,7 @@ public class DirectByteBufferStream { } /** - * @param val Value + * @param val Value. */ public void writeDoubleArray(double[] val) { if (val != null) @@ -553,7 +566,7 @@ public class DirectByteBufferStream { } /** - * @param val Value + * @param val Value. */ public void writeCharArray(char[] val) { if (val != null) @@ -563,7 +576,7 @@ public class DirectByteBufferStream { } /** - * @param val Value + * @param val Value. */ public void writeBooleanArray(boolean[] val) { if (val != null) @@ -573,31 +586,87 @@ public class DirectByteBufferStream { } /** - * @param val Value + * @param val Value. */ public void writeString(String val) { writeByteArray(val != null ? val.getBytes() : null); } /** - * @param val Value + * @param val Value. */ public void writeBitSet(BitSet val) { writeLongArray(val != null ? val.toLongArray() : null); } /** - * @param val Value + * @param val Value. */ public void writeUuid(UUID val) { - writeByteArray(val != null ? U.uuidToBytes(val) : null); + switch (uuidState) { + case 0: + writeBoolean(val == null); + + if (!lastFinished || val == null) + return; + + uuidState++; + + case 1: + writeLong(val.getMostSignificantBits()); + + if (!lastFinished) + return; + + uuidState++; + + case 2: + writeLong(val.getLeastSignificantBits()); + + if (!lastFinished) + return; + + uuidState = 0; + } } /** - * @param val Value + * @param val Value. */ public void writeIgniteUuid(IgniteUuid val) { - writeByteArray(val != null ? U.igniteUuidToBytes(val) : null); + switch (uuidState) { + case 0: + writeBoolean(val == null); + + if (!lastFinished || val == null) + return; + + uuidState++; + + case 1: + writeLong(val.globalId().getMostSignificantBits()); + + if (!lastFinished) + return; + + uuidState++; + + case 2: + writeLong(val.globalId().getLeastSignificantBits()); + + if (!lastFinished) + return; + + uuidState++; + + case 3: + writeLong(val.localId()); + + if (!lastFinished) + return; + + uuidState = 0; + } } /** @@ -629,18 +698,20 @@ public class DirectByteBufferStream { */ public <T> void writeObjectArray(T[] arr, MessageCollectionItemType itemType, MessageWriter writer) { if (arr != null) { - if (arrIt == null) { - writeInt(arr.length); + int len = arr.length; + + if (arrPos == -1) { + writeInt(len); if (!lastFinished) return; - arrIt = arrayIterator(arr); + arrPos = 0; } - while (arrIt.hasNext() || arrCur != NULL) { + while (arrPos < len || arrCur != NULL) { if (arrCur == NULL) - arrCur = arrIt.next(); + arrCur = arr[arrPos++]; write(itemType, arrCur, writer); @@ -650,7 +721,7 @@ public class DirectByteBufferStream { arrCur = NULL; } - arrIt = null; + arrPos = -1; } else writeInt(-1); @@ -691,6 +762,44 @@ public class DirectByteBufferStream { } /** + * @param list List. + * @param itemType Component type. + * @param writer Writer. + */ + public <T> void writeRandomAccessList(List<T> list, MessageCollectionItemType itemType, MessageWriter writer) { + if (list != null) { + assert list instanceof RandomAccess; + + int size = list.size(); + + if (arrPos == -1) { + writeInt(size); + + if (!lastFinished) + return; + + arrPos = 0; + } + + while (arrPos < size || arrCur != NULL) { + if (arrCur == NULL) + arrCur = list.get(arrPos++); + + write(itemType, arrCur, writer); + + if (!lastFinished) + return; + + arrCur = NULL; + } + + arrPos = -1; + } + else + writeInt(-1); + } + + /** * @param map Map. * @param keyType Key type. * @param valType Value type. @@ -783,20 +892,17 @@ public class DirectByteBufferStream { int val = 0; - int initPos = buf.position(); - int shift = 0; - while (buf.hasRemaining()) { - byte b = UNSAFE.getByte(heapArr, baseOff + initPos + shift); + int pos = buf.position(); - prim |= ((long)b & 0x7F) << (7 * primShift); + byte b = UNSAFE.getByte(heapArr, baseOff + pos); + + buf.position(pos + 1); - primShift++; - shift++; + prim |= ((long)b & 0x7F) << (7 * primShift); if ((b & 0x80) == 0) { lastFinished = true; - primShift = 0; val = (int)prim; @@ -806,13 +912,14 @@ public class DirectByteBufferStream { val--; prim = 0; + primShift = 0; break; } + else + primShift++; } - buf.position(initPos + shift); - return val; } @@ -824,20 +931,17 @@ public class DirectByteBufferStream { long val = 0; - int initPos = buf.position(); - int shift = 0; - while (buf.hasRemaining()) { - byte b = UNSAFE.getByte(heapArr, baseOff + initPos + shift); + int pos = buf.position(); - prim |= ((long)b & 0x7F) << (7 * primShift); + byte b = UNSAFE.getByte(heapArr, baseOff + pos); + + buf.position(pos + 1); - shift++; - primShift++; + prim |= ((long)b & 0x7F) << (7 * primShift); if ((b & 0x80) == 0) { lastFinished = true; - primShift = 0; val = prim; @@ -847,13 +951,14 @@ public class DirectByteBufferStream { val--; prim = 0; + primShift = 0; break; } + else + primShift++; } - buf.position(initPos + shift); - return val; } @@ -1004,18 +1109,84 @@ public class DirectByteBufferStream { * @return Value. */ public UUID readUuid() { - byte[] arr = readByteArray(); + switch (uuidState) { + case 0: + boolean isNull = readBoolean(); + + if (!lastFinished || isNull) + return null; + + uuidState++; + + case 1: + uuidMost = readLong(); + + if (!lastFinished) + return null; + + uuidState++; + + case 2: + uuidLeast = readLong(); - return arr != null ? U.bytesToUuid(arr, 0) : null; + if (!lastFinished) + return null; + + uuidState = 0; + } + + UUID val = new UUID(uuidMost, uuidLeast); + + uuidMost = 0; + uuidLeast = 0; + + return val; } /** * @return Value. */ public IgniteUuid readIgniteUuid() { - byte[] arr = readByteArray(); + switch (uuidState) { + case 0: + boolean isNull = readBoolean(); - return arr != null ? U.bytesToIgniteUuid(arr, 0) : null; + if (!lastFinished || isNull) + return null; + + uuidState++; + + case 1: + uuidMost = readLong(); + + if (!lastFinished) + return null; + + uuidState++; + + case 2: + uuidLeast = readLong(); + + if (!lastFinished) + return null; + + uuidState++; + + case 3: + uuidLocId = readLong(); + + if (!lastFinished) + return null; + + uuidState = 0; + } + + IgniteUuid val = new IgniteUuid(new UUID(uuidMost, uuidLeast), uuidLocId); + + uuidMost = 0; + uuidLeast = 0; + + return val; } /** @@ -1204,7 +1375,8 @@ public class DirectByteBufferStream { * @return Whether array was fully written. */ private boolean writeArray(Object arr, long off, int len, int bytes) { - assert arr == null || (arr.getClass().isArray() && arr.getClass().getComponentType().isPrimitive()); + assert arr != null; + assert arr.getClass().isArray() && arr.getClass().getComponentType().isPrimitive(); assert off > 0; assert len >= 0; assert bytes >= 0; @@ -1224,24 +1396,24 @@ public class DirectByteBufferStream { int remaining = buf.remaining(); if (toWrite <= remaining) { - UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, toWrite); - - pos += toWrite; + if (toWrite > 0) { + UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, toWrite); - buf.position(pos); + buf.position(pos + toWrite); + } arrOff = -1; return true; } else { - UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, remaining); - - pos += remaining; + if (remaining > 0) { + UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, remaining); - buf.position(pos); + buf.position(pos + remaining); - arrOff += remaining; + arrOff += remaining; + } return false; } @@ -1511,31 +1683,6 @@ public class DirectByteBufferStream { } /** - * @param arr Array. - * @return Array iterator. - */ - private Iterator<?> arrayIterator(final Object[] arr) { - return new Iterator<Object>() { - private int idx; - - @Override public boolean hasNext() { - return idx < arr.length; - } - - @Override public Object next() { - if (!hasNext()) - throw new NoSuchElementException(); - - return arr[idx++]; - } - - @Override public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - - /** * Array creator. */ private static interface ArrayCreator<T> { http://git-wip-us.apache.org/repos/asf/ignite/blob/393b630e/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java index ea0f37e..8ad7042 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java @@ -20,7 +20,9 @@ package org.apache.ignite.internal.direct; import java.nio.ByteBuffer; import java.util.BitSet; import java.util.Collection; +import java.util.List; import java.util.Map; +import java.util.RandomAccess; import java.util.UUID; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; @@ -213,7 +215,10 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public <T> boolean writeCollection(String name, Collection<T> col, MessageCollectionItemType itemType) { - stream.writeCollection(col, itemType, this); + if (col instanceof List && col instanceof RandomAccess) + stream.writeRandomAccessList((List<T>)col, itemType, this); + else + stream.writeCollection(col, itemType, this); return stream.lastFinished(); } @@ -260,4 +265,4 @@ public class DirectMessageWriter implements MessageWriter { @Override public void reset() { state.reset(); } -} \ No newline at end of file +}