IGNITE-61 - Client fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b8e35888 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b8e35888 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b8e35888 Branch: refs/heads/ignite-61 Commit: b8e35888876125c8ffeda7428ffe2da09fa3c8bd Parents: c12c674 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Fri Feb 6 14:11:54 2015 -0800 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Fri Feb 6 14:11:54 2015 -0800 ---------------------------------------------------------------------- .../CommunicationMessageCodeGenerator.java | 2 +- .../GridClientHandshakeRequestWrapper.java | 36 ++++------- .../GridClientHandshakeResponseWrapper.java | 2 - .../message/GridClientMessageWrapper.java | 64 +++++++++++++------- .../tcp/GridMemcachedMessageWrapper.java | 2 +- .../internal/util/GridClientByteUtils.java | 3 +- .../GridTcpCommunicationByteBufferStream.java | 35 ++++++++--- 7 files changed, 85 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b8e35888/modules/codegen/src/main/java/org/apache/ignite/codegen/CommunicationMessageCodeGenerator.java ---------------------------------------------------------------------- diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/CommunicationMessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/CommunicationMessageCodeGenerator.java index decac32..b861806 100644 --- a/modules/codegen/src/main/java/org/apache/ignite/codegen/CommunicationMessageCodeGenerator.java +++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/CommunicationMessageCodeGenerator.java @@ -60,7 +60,7 @@ public class CommunicationMessageCodeGenerator { }; /** */ - private static final String SRC_DIR = U.getGridGainHome() + "/modules/core/src/main/java"; + private static final String SRC_DIR = U.getIgniteHome() + "/modules/core/src/main/java"; /** */ private static final Class<?> BASE_CLS = GridTcpCommunicationMessageAdapter.class; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b8e35888/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequestWrapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequestWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequestWrapper.java index 1c0366a..b7b986f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequestWrapper.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequestWrapper.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.rest.client.message; import org.apache.ignite.internal.util.direct.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.gridgain.grid.util.direct.*; import java.nio.*; @@ -32,6 +33,9 @@ public class GridClientHandshakeRequestWrapper extends GridTcpCommunicationMessa /** Signal char. */ public static final byte HANDSHAKE_HEADER = (byte)0x91; + /** Stream. */ + private final GridTcpCommunicationByteBufferStream stream = new GridTcpCommunicationByteBufferStream(null); + /** Handshake bytes. */ private byte[] bytes; @@ -59,43 +63,29 @@ public class GridClientHandshakeRequestWrapper extends GridTcpCommunicationMessa /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf) { - commState.setBuffer(buf); + stream.setBuffer(buf); if (!commState.typeWritten) { - if (!commState.putByte(null, directType())) + if (!buf.hasRemaining()) return false; + stream.writeByte(directType()); + commState.typeWritten = true; } - switch (commState.idx) { - case 0: - if (!commState.putByteArray("bytes", bytes)) - return false; - - commState.idx++; - - } + stream.writeByteArray(bytes, 0, bytes.length); - return true; + return stream.lastFinished(); } /** {@inheritDoc} */ @Override public boolean readFrom(ByteBuffer buf) { - commState.setBuffer(buf); + stream.setBuffer(buf); - switch (commState.idx) { - case 0: - bytes = commState.getByteArray("bytes"); - - if (!commState.lastRead()) - return false; - - commState.idx++; - - } + bytes = stream.readByteArray(GridClientHandshakeRequest.PACKET_SIZE); - return true; + return stream.lastFinished(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b8e35888/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeResponseWrapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeResponseWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeResponseWrapper.java index 8bb10a3..7ce67b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeResponseWrapper.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeResponseWrapper.java @@ -62,8 +62,6 @@ public class GridClientHandshakeResponseWrapper extends GridTcpCommunicationMess /** {@inheritDoc} */ @Override public boolean readFrom(ByteBuffer buf) { - commState.setBuffer(buf); - return true; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b8e35888/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientMessageWrapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientMessageWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientMessageWrapper.java index 4ba6a4f..fe32468 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientMessageWrapper.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientMessageWrapper.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.rest.client.message; import org.apache.ignite.internal.util.direct.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.gridgain.grid.util.direct.*; import java.nio.*; import java.util.*; @@ -33,6 +34,9 @@ public class GridClientMessageWrapper extends GridTcpCommunicationMessageAdapter /** Client request header. */ public static final byte REQ_HEADER = (byte)0x90; + /** Stream. */ + private final GridTcpCommunicationByteBufferStream stream = new GridTcpCommunicationByteBufferStream(null); + /** */ private int msgSize; @@ -130,43 +134,52 @@ public class GridClientMessageWrapper extends GridTcpCommunicationMessageAdapter /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf) { - commState.setBuffer(buf); + stream.setBuffer(buf); if (!commState.typeWritten) { - if (!commState.putByte(null, directType())) + if (stream.remaining() < 1) return false; + stream.writeByte(directType()); + commState.typeWritten = true; } switch (commState.idx) { case 0: - if (!commState.putInt("msgSize", msgSize)) + if (stream.remaining() < 4) return false; + stream.writeInt(msgSize); + commState.idx++; case 1: - if (!commState.putLong("reqId", reqId)) + if (stream.remaining() < 8) return false; + stream.writeLong(reqId); + commState.idx++; case 2: - if (!commState.putUuid("clientId", clientId)) + if (stream.remaining() < 16) return false; + stream.writeByteArray(U.uuidToBytes(clientId), 0, 16); + commState.idx++; case 3: - if (!commState.putUuid("destId", destId)) + if (stream.remaining() < 16) return false; + stream.writeByteArray(U.uuidToBytes(destId), 0, 16); + commState.idx++; case 4: - if (!commState.putByteBuffer("msg", msg)) - return false; + stream.writeByteArray(msg.array(), msg.position(), msg.remaining()); commState.idx++; @@ -177,47 +190,52 @@ public class GridClientMessageWrapper extends GridTcpCommunicationMessageAdapter /** {@inheritDoc} */ @Override public boolean readFrom(ByteBuffer buf) { - commState.setBuffer(buf); + stream.setBuffer(buf); switch (commState.idx) { case 0: - msgSize = commState.getInt("msgSize"); - - if (!commState.lastRead()) + if (stream.remaining() < 4) return false; + msgSize = stream.readInt(); + + if (msgSize == 0) // Ping message. + return true; + commState.idx++; case 1: - reqId = commState.getLong("reqId"); - - if (!commState.lastRead()) + if (stream.remaining() < 8) return false; + reqId = stream.readLong(); + commState.idx++; case 2: - clientId = commState.getUuid("clientId"); - - if (!commState.lastRead()) + if (stream.remaining() < 16) return false; + clientId = U.bytesToUuid(stream.readByteArray(16), 0); + commState.idx++; case 3: - destId = commState.getUuid("destId"); - - if (!commState.lastRead()) + if (stream.remaining() < 16) return false; + destId = U.bytesToUuid(stream.readByteArray(16), 0); + commState.idx++; case 4: - msg = commState.getByteBuffer("msg"); + byte[] msg0 = stream.readByteArray(msgSize); - if (!commState.lastRead()) + if (!stream.lastFinished()) return false; + msg = ByteBuffer.wrap(msg0); + commState.idx++; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b8e35888/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridMemcachedMessageWrapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridMemcachedMessageWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridMemcachedMessageWrapper.java index 906efa2..7d93503 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridMemcachedMessageWrapper.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridMemcachedMessageWrapper.java @@ -78,7 +78,7 @@ public class GridMemcachedMessageWrapper extends GridTcpCommunicationMessageAdap commState.typeWritten = true; } - stream.writeByteArrayNoLength(bytes); + stream.writeByteArray(bytes, 0, bytes.length); return stream.lastFinished(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b8e35888/modules/core/src/main/java/org/apache/ignite/internal/util/GridClientByteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridClientByteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridClientByteUtils.java index f15e13b..1c7cbd9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridClientByteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridClientByteUtils.java @@ -106,7 +106,7 @@ public abstract class GridClientByteUtils { * @return Encoded into byte array {@link UUID}. */ public static byte[] uuidToBytes(UUID uuid) { - byte[] bytes = new byte[(Long.SIZE >> 3)* 2]; + byte[] bytes = new byte[(Long.SIZE >> 3) * 2]; uuidToBytes(uuid, bytes, 0); @@ -122,7 +122,6 @@ public abstract class GridClientByteUtils { * @return Number of bytes overwritten in {@code bytes} array. */ public static int uuidToBytes(UUID uuid, byte[] bytes, int off) { - ByteBuffer buf = ByteBuffer.wrap(bytes, off, 16); buf.order(ByteOrder.BIG_ENDIAN); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b8e35888/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationByteBufferStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationByteBufferStream.java b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationByteBufferStream.java index ea4a36f..76b6eb7 100644 --- a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationByteBufferStream.java +++ b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationByteBufferStream.java @@ -276,10 +276,10 @@ public class GridTcpCommunicationByteBufferStream { /** * @param val Array. */ - public void writeByteArrayNoLength(byte[] val) { + public void writeByteArray(byte[] val, int off, int len) { assert val != null; - lastFinished = writeArray(val, BYTE_ARR_OFF, val.length, val.length, true); + lastFinished = writeArray(val, BYTE_ARR_OFF + off, len, len, true); } /** {@inheritDoc} */ @@ -419,6 +419,14 @@ public class GridTcpCommunicationByteBufferStream { return readArray(BYTE_ARR_CREATOR, 0, BYTE_ARR_OFF); } + /** + * @param len Length. + * @return Array. + */ + public byte[] readByteArray(int len) { + return readArray(BYTE_ARR_CREATOR, 0, BYTE_ARR_OFF, len); + } + /** {@inheritDoc} */ public boolean readBoolean() { assert buf.hasRemaining(); @@ -639,16 +647,29 @@ public class GridTcpCommunicationByteBufferStream { * @return Array or special value if it was not fully read. */ private <T> T readArray(ArrayCreator<T> creator, int lenShift, long off) { + return readArray(creator, lenShift, off, -1); + } + + /** + * @param creator Array creator. + * @param lenShift Array length shift size. + * @param off Base offset. + * @param len Length. + * @return Array or special value if it was not fully read. + */ + private <T> T readArray(ArrayCreator<T> creator, int lenShift, long off, int len) { assert creator != null; if (tmpArr == null) { - if (remaining() < 4) { - lastFinished = false; + if (len == -1) { + if (remaining() < 4) { + lastFinished = false; - return null; - } + return null; + } - int len = readInt(); + len = readInt(); + } switch (len) { case -1: