IGNITE-2105 - Fixed nested collections in direct marshalling
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6c61598b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6c61598b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6c61598b Branch: refs/heads/ignite-1537 Commit: 6c61598bc66594e535af4fb10f34abe6797b72c0 Parents: 10b83fb Author: Valentin Kulichenko <valentin.kuliche...@gmail.com> Authored: Fri Dec 11 17:00:25 2015 -0800 Committer: Valentin Kulichenko <valentin.kuliche...@gmail.com> Committed: Fri Dec 11 17:00:25 2015 -0800 ---------------------------------------------------------------------- .../internal/direct/DirectMessageWriter.java | 108 ++++++++++++++----- .../ignite/util/GridMessageCollectionTest.java | 34 ++++-- 2 files changed, 111 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6c61598b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java index ad122ba..085cf68 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java @@ -38,44 +38,29 @@ import org.jetbrains.annotations.Nullable; * Message writer implementation. */ public class DirectMessageWriter implements MessageWriter { - /** Stream. */ - private final DirectByteBufferStream stream; - /** State. */ - private final DirectMessageState<StateItem> state = new DirectMessageState<>(StateItem.class, - new IgniteOutClosure<StateItem>() { - @Override public StateItem apply() { - return new StateItem(); - } - }); + private final DirectMessageState<StateItem> state; /** * @param protoVer Protocol version. */ - public DirectMessageWriter(byte protoVer) { - switch (protoVer) { - case 1: - stream = new DirectByteBufferStreamImplV1(null); - - break; - - case 2: - stream = new DirectByteBufferStreamImplV2(null); - - break; - - default: - throw new IllegalStateException("Invalid protocol version: " + protoVer); - } + public DirectMessageWriter(final byte protoVer) { + state = new DirectMessageState<>(StateItem.class, new IgniteOutClosure<StateItem>() { + @Override public StateItem apply() { + return new StateItem(protoVer); + } + }); } /** {@inheritDoc} */ @Override public void setBuffer(ByteBuffer buf) { - stream.setBuffer(buf); + state.item().stream.setBuffer(buf); } /** {@inheritDoc} */ @Override public boolean writeHeader(byte type, byte fieldCnt) { + DirectByteBufferStream stream = state.item().stream; + stream.writeByte(type); return stream.lastFinished(); @@ -83,6 +68,8 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public boolean writeByte(String name, byte val) { + DirectByteBufferStream stream = state.item().stream; + stream.writeByte(val); return stream.lastFinished(); @@ -90,6 +77,8 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public boolean writeShort(String name, short val) { + DirectByteBufferStream stream = state.item().stream; + stream.writeShort(val); return stream.lastFinished(); @@ -97,6 +86,8 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public boolean writeInt(String name, int val) { + DirectByteBufferStream stream = state.item().stream; + stream.writeInt(val); return stream.lastFinished(); @@ -104,6 +95,8 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public boolean writeLong(String name, long val) { + DirectByteBufferStream stream = state.item().stream; + stream.writeLong(val); return stream.lastFinished(); @@ -111,6 +104,8 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public boolean writeFloat(String name, float val) { + DirectByteBufferStream stream = state.item().stream; + stream.writeFloat(val); return stream.lastFinished(); @@ -118,6 +113,8 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public boolean writeDouble(String name, double val) { + DirectByteBufferStream stream = state.item().stream; + stream.writeDouble(val); return stream.lastFinished(); @@ -125,6 +122,8 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public boolean writeChar(String name, char val) { + DirectByteBufferStream stream = state.item().stream; + stream.writeChar(val); return stream.lastFinished(); @@ -132,6 +131,8 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public boolean writeBoolean(String name, boolean val) { + DirectByteBufferStream stream = state.item().stream; + stream.writeBoolean(val); return stream.lastFinished(); @@ -139,6 +140,8 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public boolean writeByteArray(String name, @Nullable byte[] val) { + DirectByteBufferStream stream = state.item().stream; + stream.writeByteArray(val); return stream.lastFinished(); @@ -146,6 +149,8 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public boolean writeByteArray(String name, byte[] val, long off, int len) { + DirectByteBufferStream stream = state.item().stream; + stream.writeByteArray(val, off, len); return stream.lastFinished(); @@ -153,6 +158,8 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public boolean writeShortArray(String name, @Nullable short[] val) { + DirectByteBufferStream stream = state.item().stream; + stream.writeShortArray(val); return stream.lastFinished(); @@ -160,6 +167,8 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public boolean writeIntArray(String name, @Nullable int[] val) { + DirectByteBufferStream stream = state.item().stream; + stream.writeIntArray(val); return stream.lastFinished(); @@ -167,6 +176,8 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public boolean writeLongArray(String name, @Nullable long[] val) { + DirectByteBufferStream stream = state.item().stream; + stream.writeLongArray(val); return stream.lastFinished(); @@ -174,6 +185,8 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public boolean writeFloatArray(String name, @Nullable float[] val) { + DirectByteBufferStream stream = state.item().stream; + stream.writeFloatArray(val); return stream.lastFinished(); @@ -181,6 +194,8 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public boolean writeDoubleArray(String name, @Nullable double[] val) { + DirectByteBufferStream stream = state.item().stream; + stream.writeDoubleArray(val); return stream.lastFinished(); @@ -188,6 +203,8 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public boolean writeCharArray(String name, @Nullable char[] val) { + DirectByteBufferStream stream = state.item().stream; + stream.writeCharArray(val); return stream.lastFinished(); @@ -195,6 +212,8 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public boolean writeBooleanArray(String name, @Nullable boolean[] val) { + DirectByteBufferStream stream = state.item().stream; + stream.writeBooleanArray(val); return stream.lastFinished(); @@ -202,6 +221,8 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public boolean writeString(String name, String val) { + DirectByteBufferStream stream = state.item().stream; + stream.writeString(val); return stream.lastFinished(); @@ -209,6 +230,8 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public boolean writeBitSet(String name, BitSet val) { + DirectByteBufferStream stream = state.item().stream; + stream.writeBitSet(val); return stream.lastFinished(); @@ -216,6 +239,8 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public boolean writeUuid(String name, UUID val) { + DirectByteBufferStream stream = state.item().stream; + stream.writeUuid(val); return stream.lastFinished(); @@ -223,6 +248,8 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public boolean writeIgniteUuid(String name, IgniteUuid val) { + DirectByteBufferStream stream = state.item().stream; + stream.writeIgniteUuid(val); return stream.lastFinished(); @@ -230,6 +257,8 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public boolean writeMessage(String name, @Nullable Message msg) { + DirectByteBufferStream stream = state.item().stream; + stream.writeMessage(msg, this); return stream.lastFinished(); @@ -237,6 +266,8 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public <T> boolean writeObjectArray(String name, T[] arr, MessageCollectionItemType itemType) { + DirectByteBufferStream stream = state.item().stream; + stream.writeObjectArray(arr, itemType, this); return stream.lastFinished(); @@ -244,6 +275,8 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public <T> boolean writeCollection(String name, Collection<T> col, MessageCollectionItemType itemType) { + DirectByteBufferStream stream = state.item().stream; + stream.writeCollection(col, itemType, this); return stream.lastFinished(); @@ -252,6 +285,8 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public <K, V> boolean writeMap(String name, Map<K, V> map, MessageCollectionItemType keyType, MessageCollectionItemType valType) { + DirectByteBufferStream stream = state.item().stream; + stream.writeMap(map, keyType, valType, this); return stream.lastFinished(); @@ -296,11 +331,34 @@ public class DirectMessageWriter implements MessageWriter { */ private static class StateItem implements DirectMessageStateItem { /** */ + private final DirectByteBufferStream stream; + + /** */ private int state; /** */ private boolean hdrWritten; + /** + * @param protoVer Protocol version. + */ + public StateItem(byte protoVer) { + switch (protoVer) { + case 1: + stream = new DirectByteBufferStreamImplV1(null); + + break; + + case 2: + stream = new DirectByteBufferStreamImplV2(null); + + break; + + default: + throw new IllegalStateException("Invalid protocol version: " + protoVer); + } + } + /** {@inheritDoc} */ @Override public void reset() { state = 0; http://git-wip-us.apache.org/repos/asf/ignite/blob/6c61598b/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java b/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java index e910a8a..44df767 100644 --- a/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java @@ -24,6 +24,9 @@ import org.apache.ignite.internal.direct.DirectMessageWriter; import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; import org.apache.ignite.internal.util.UUIDCollectionMessage; import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageFactory; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; import static java.util.UUID.randomUUID; import static org.apache.ignite.internal.util.GridMessageCollection.of; @@ -36,6 +39,23 @@ public class GridMessageCollectionTest extends TestCase { private byte proto; /** + * @param proto Protocol version. + * @return Writer. + */ + protected MessageWriter writer(byte proto) { + return new DirectMessageWriter(proto); + } + + /** + * @param msgFactory Message factory. + * @param proto Protocol version. + * @return Writer. + */ + protected MessageReader reader(MessageFactory msgFactory, byte proto) { + return new DirectMessageReader(msgFactory, proto); + } + + /** * */ public void testMarshal() { @@ -88,17 +108,19 @@ public class GridMessageCollectionTest extends TestCase { private void doTestMarshal(Message m) { ByteBuffer buf = ByteBuffer.allocate(8 * 1024); - DirectMessageWriter w = new DirectMessageWriter(proto); - - m.writeTo(buf, w); + m.writeTo(buf, writer(proto)); buf.flip(); - DirectMessageReader r = new DirectMessageReader(new GridIoMessageFactory(null), proto); + byte type = buf.get(); + + assertEquals(m.directType(), type); + + GridIoMessageFactory msgFactory = new GridIoMessageFactory(null); - r.setBuffer(buf); + Message mx = msgFactory.create(type); - Message mx = r.readMessage(null); + mx.readFrom(buf, reader(msgFactory, proto)); assertEquals(m, mx); }