IGNITE-5899 Thin client: cache.Get for primitives This closes #2376
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bb6f3fa8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bb6f3fa8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bb6f3fa8 Branch: refs/heads/ignite-5896 Commit: bb6f3fa8d6a634f2112d5764f4548a3860ee244e Parents: eae6e3b Author: Pavel Tupitsyn <[email protected]> Authored: Tue Aug 29 18:57:07 2017 +0300 Committer: Pavel Tupitsyn <[email protected]> Committed: Wed Aug 30 18:59:49 2017 +0300 ---------------------------------------------------------------------- .../processors/odbc/SqlListenerNioListener.java | 15 +- .../platform/client/ClientCacheRequest.java | 54 ++++++ .../platform/client/ClientGetRequest.java | 48 ++++++ .../platform/client/ClientGetResponse.java | 46 +++++ .../platform/client/ClientMessageParser.java | 83 +++++++++ .../platform/client/ClientRequest.java | 58 +++++++ .../platform/client/ClientRequestHandler.java | 58 +++++++ .../platform/client/ClientResponse.java | 49 ++++++ .../Apache.Ignite.Core.Tests.csproj | 1 + .../Client/RawSocketTest.cs | 167 +++++++++++++++++++ 10 files changed, 577 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/bb6f3fa8/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java index 8dad71b..3e8299c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java @@ -32,6 +32,8 @@ import org.apache.ignite.internal.processors.odbc.jdbc.JdbcMessageParser; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequestHandler; import org.apache.ignite.internal.processors.odbc.odbc.OdbcMessageParser; import org.apache.ignite.internal.processors.odbc.odbc.OdbcRequestHandler; +import org.apache.ignite.internal.processors.platform.client.ClientMessageParser; +import org.apache.ignite.internal.processors.platform.client.ClientRequestHandler; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter; import org.apache.ignite.internal.util.nio.GridNioSession; @@ -42,10 +44,10 @@ import org.jetbrains.annotations.Nullable; * SQL message listener. */ public class SqlListenerNioListener extends GridNioServerListenerAdapter<byte[]> { - /** The value corresponds to ODBC driver of the parser field of the handshake request. */ + /** ODBC driver handshake code. */ public static final byte ODBC_CLIENT = 0; - /** The value corresponds to JDBC driver of the parser field of the handshake request. */ + /** JDBC driver handshake code. */ public static final byte JDBC_CLIENT = 1; /** Version 2.1.0. */ @@ -54,6 +56,9 @@ public class SqlListenerNioListener extends GridNioServerListenerAdapter<byte[]> /** Version 2.1.5: added "lazy" flag. */ private static final SqlListenerProtocolVersion VER_2_1_5 = SqlListenerProtocolVersion.create(2, 1, 5); + /** Thin client handshake code. */ + public static final byte THIN_CLIENT = 2; + /** Current version. */ private static final SqlListenerProtocolVersion CURRENT_VER = VER_2_1_5; @@ -278,6 +283,12 @@ public class SqlListenerNioListener extends GridNioServerListenerAdapter<byte[]> return new SqlListenerConnectionContext(handler, parser); } + else if (clientType == THIN_CLIENT) { + ClientMessageParser parser = new ClientMessageParser(ctx); + ClientRequestHandler handler = new ClientRequestHandler(ctx); + + return new SqlListenerConnectionContext(handler, parser); + } else throw new IgniteException("Unknown client type: " + clientType); } http://git-wip-us.apache.org/repos/asf/ignite/blob/bb6f3fa8/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientCacheRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientCacheRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientCacheRequest.java new file mode 100644 index 0000000..7036853 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientCacheRequest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.client; + +import org.apache.ignite.IgniteCache; +import org.apache.ignite.binary.BinaryRawReader; +import org.apache.ignite.internal.GridKernalContext; + +/** + * Cache get request. + */ +class ClientCacheRequest extends ClientRequest { + /** */ + private final int cacheId; + + /** + * Ctor. + * + * @param reader Reader. + */ + ClientCacheRequest(BinaryRawReader reader) { + super(reader); + + cacheId = reader.readInt(); + reader.readByte(); // Flags (skipStore, etc); + } + + /** + * Gets the cache for current cache id. + * + * @param ctx Kernal context. + * @return Cache. + */ + protected IgniteCache getCache(GridKernalContext ctx) { + String cacheName = ctx.cache().context().cacheContext(cacheId).cache().name(); + + return ctx.grid().cache(cacheName).withKeepBinary(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/bb6f3fa8/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientGetRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientGetRequest.java new file mode 100644 index 0000000..72d3507 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientGetRequest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.client; + +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.binary.BinaryRawReaderEx; + +/** + * Cache get request. + */ +class ClientGetRequest extends ClientCacheRequest { + /** */ + private final Object key; + + /** + * Ctor. + * + * @param reader Reader. + */ + ClientGetRequest(BinaryRawReaderEx reader) { + super(reader); + + key = reader.readObjectDetached(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public ClientResponse process(GridKernalContext ctx) { + Object val = getCache(ctx).get(key); + + return new ClientGetResponse(getRequestId(), val); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/bb6f3fa8/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientGetResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientGetResponse.java new file mode 100644 index 0000000..58a3062 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientGetResponse.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.client; + +import org.apache.ignite.binary.BinaryRawWriter; + +/** + * Cache get response. + */ +class ClientGetResponse extends ClientResponse { + /** */ + private final Object val; + + /** + * Ctor. + * + * @param requestId Request id. + */ + ClientGetResponse(int requestId, Object val) { + super(requestId); + + this.val = val; + } + + /** {@inheritDoc} */ + @Override public void encode(BinaryRawWriter writer) { + super.encode(writer); + + writer.writeObject(val); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/bb6f3fa8/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java new file mode 100644 index 0000000..5ad7ba9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.client; + +import org.apache.ignite.IgniteException; +import org.apache.ignite.binary.BinaryRawWriter; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.binary.BinaryRawReaderEx; +import org.apache.ignite.internal.binary.GridBinaryMarshaller; +import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream; +import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream; +import org.apache.ignite.internal.binary.streams.BinaryInputStream; +import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; +import org.apache.ignite.internal.processors.odbc.SqlListenerMessageParser; +import org.apache.ignite.internal.processors.odbc.SqlListenerRequest; +import org.apache.ignite.internal.processors.odbc.SqlListenerResponse; + +/** + * Thin client message parser. + */ +public class ClientMessageParser implements SqlListenerMessageParser { + /** */ + private static final short OP_CACHE_GET = 1; + + /** Marshaller. */ + private final GridBinaryMarshaller marsh; + + /** + * Ctor. + * + * @param ctx Kernal context. + */ + public ClientMessageParser(GridKernalContext ctx) { + assert ctx != null; + + CacheObjectBinaryProcessorImpl cacheObjProc = (CacheObjectBinaryProcessorImpl)ctx.cacheObjects(); + marsh = cacheObjProc.marshaller(); + } + + /** {@inheritDoc} */ + @Override public SqlListenerRequest decode(byte[] msg) { + assert msg != null; + + BinaryInputStream inStream = new BinaryHeapInputStream(msg); + BinaryRawReaderEx reader = marsh.reader(inStream); + + short opCode = reader.readShort(); + + switch (opCode) { + case OP_CACHE_GET: { + return new ClientGetRequest(reader); + } + } + + throw new IgniteException("Invalid operation: " + opCode); + } + + /** {@inheritDoc} */ + @Override public byte[] encode(SqlListenerResponse resp) { + BinaryHeapOutputStream outStream = new BinaryHeapOutputStream(32); + + BinaryRawWriter writer = marsh.writer(outStream); + + ((ClientResponse)resp).encode(writer); + + return outStream.array(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/bb6f3fa8/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequest.java new file mode 100644 index 0000000..f542850 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.client; + +import org.apache.ignite.binary.BinaryRawReader; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.odbc.SqlListenerRequest; + +/** + * Thin client request. + */ +class ClientRequest extends SqlListenerRequest { + /** Request id. */ + private final int requestId; + + /** + * Ctor. + * + * @param reader Reader. + */ + ClientRequest(BinaryRawReader reader) { + reader.readByte(); // Flags: Compression, etc. + requestId = reader.readInt(); + } + + /** + * Gets the request id. + * + * @return Data. + */ + public int getRequestId() { + return requestId; + } + + /** + * Processes the request. + * + * @return Response. + */ + public ClientResponse process(GridKernalContext ctx) { + return new ClientResponse(requestId); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/bb6f3fa8/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequestHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequestHandler.java new file mode 100644 index 0000000..7f019cf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequestHandler.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.client; + +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.binary.BinaryWriterExImpl; +import org.apache.ignite.internal.processors.odbc.SqlListenerRequest; +import org.apache.ignite.internal.processors.odbc.SqlListenerRequestHandler; +import org.apache.ignite.internal.processors.odbc.SqlListenerResponse; + +/** + * Thin client request handler. + */ +public class ClientRequestHandler implements SqlListenerRequestHandler { + /** Kernal context. */ + private final GridKernalContext ctx; + + /** + * Ctor. + * + * @param ctx Kernal context. + */ + public ClientRequestHandler(GridKernalContext ctx) { + assert ctx != null; + + this.ctx = ctx; + } + + /** {@inheritDoc} */ + @Override public SqlListenerResponse handle(SqlListenerRequest req) { + return ((ClientRequest)req).process(ctx); + } + + /** {@inheritDoc} */ + @Override public SqlListenerResponse handleException(Exception e) { + return null; + } + + /** {@inheritDoc} */ + @Override public void writeHandshake(BinaryWriterExImpl writer) { + writer.writeBoolean(true); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/bb6f3fa8/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientResponse.java new file mode 100644 index 0000000..2fe65c7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientResponse.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.client; + +import org.apache.ignite.binary.BinaryRawWriter; +import org.apache.ignite.internal.processors.odbc.SqlListenerResponse; +import org.jetbrains.annotations.Nullable; + +/** + * Thin client response. + */ +class ClientResponse extends SqlListenerResponse { + /** Request id. */ + private final int requestId; + + /** + * Ctor. + * + * @param requestId Request id. + */ + ClientResponse(int requestId) { + super(STATUS_SUCCESS, null); + + this.requestId = requestId; + } + + /** + * Encodes the response data. + */ + public void encode(BinaryRawWriter writer) { + writer.writeInt(requestId); + writer.writeByte((byte)0); // Flags (compression, etc); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/bb6f3fa8/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj index 3f5f9b3..f704005 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj @@ -92,6 +92,7 @@ <Compile Include="Cache\Query\Linq\CacheLinqTest.Contains.cs" /> <Compile Include="Cache\Store\CacheStoreSessionTestCodeConfig.cs" /> <Compile Include="Cache\Store\CacheStoreSessionTestSharedFactory.cs" /> + <Compile Include="Client\RawSocketTest.cs" /> <Compile Include="Deployment\CacheGetFunc.cs" /> <Compile Include="Deployment\GetAddressFunc.cs" /> <Compile Include="Deployment\PeerAssemblyLoadingAllApisTest.cs" /> http://git-wip-us.apache.org/repos/asf/ignite/blob/bb6f3fa8/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/RawSocketTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/RawSocketTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/RawSocketTest.cs new file mode 100644 index 0000000..9d8c427 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/RawSocketTest.cs @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Tests.Client +{ + using System; + using System.Net; + using System.Net.Sockets; + using Apache.Ignite.Core.Cache.Configuration; + using Apache.Ignite.Core.Configuration; + using Apache.Ignite.Core.Impl; + using Apache.Ignite.Core.Impl.Binary; + using Apache.Ignite.Core.Impl.Binary.IO; + using NUnit.Framework; + + /// <summary> + /// Tests the thin client mode with a raw socket. + /// </summary> + public class RawSocketTest + { + /// <summary> + /// Tests the socket handshake connection. + /// </summary> + [Test] + public void TestCacheGet() + { + var cfg = new IgniteConfiguration(TestUtils.GetTestConfiguration()) + { + SqlConnectorConfiguration = new SqlConnectorConfiguration() + }; + + using (var ignite = Ignition.Start(cfg)) + { + var marsh = ((Ignite) ignite).Marshaller; + + // Create cache. + var cacheCfg = new CacheConfiguration("foo", new QueryEntity(typeof(int), typeof(string))); + var cache = ignite.CreateCache<int, string>(cacheCfg); + cache[1] = "bar"; + + // Connect socket. + var sock = GetSocket(SqlConnectorConfiguration.DefaultPort); + Assert.IsTrue(sock.Connected); + + DoHandshake(sock); + + // Cache get. + SendRequest(sock, stream => + { + stream.WriteShort(1); // OP_GET + stream.WriteByte(0); // Flags (compression, etc) + stream.WriteInt(1); // Request id. + var cacheId = BinaryUtils.GetStringHashCode(cache.Name); + stream.WriteInt(cacheId); + stream.WriteByte(0); // Flags (withSkipStore, etc) + + var writer = marsh.StartMarshal(stream); + + writer.WriteObject(1); // Key + }); + + var msg = ReceiveMessage(sock); + + using (var stream = new BinaryHeapStream(msg)) + { + var reader = marsh.StartUnmarshal(stream); + + int requestId = reader.ReadInt(); + Assert.AreEqual(1, requestId); + + reader.ReadByte(); // Flags + + var res = reader.ReadObject<string>(); + Assert.AreEqual(cache[1], res); + } + } + } + + /// <summary> + /// Does the handshake. + /// </summary> + /// <param name="sock">The sock.</param> + private static void DoHandshake(Socket sock) + { + var sentBytes = SendRequest(sock, stream => + { + // Handshake. + stream.WriteByte(1); + + // Protocol version. + stream.WriteShort(2); + stream.WriteShort(1); + stream.WriteShort(0); + + // Client type: platform. + stream.WriteByte(2); + }); + + Assert.AreEqual(12, sentBytes); + + // ACK. + var ack = ReceiveMessage(sock); + + Assert.AreEqual(1, ack.Length); + Assert.AreEqual(1, ack[0]); + } + + /// <summary> + /// Receives the message. + /// </summary> + private static byte[] ReceiveMessage(Socket sock) + { + var buf = new byte[4]; + sock.Receive(buf); + + using (var stream = new BinaryHeapStream(buf)) + { + var size = stream.ReadInt(); + buf = new byte[size]; + sock.Receive(buf); + return buf; + } + } + + /// <summary> + /// Sends the request. + /// </summary> + private static int SendRequest(Socket sock, Action<BinaryHeapStream> writeAction) + { + using (var stream = new BinaryHeapStream(128)) + { + stream.WriteInt(0); // Reserve message size. + + writeAction(stream); + + stream.WriteInt(0, stream.Position - 4); // Write message size. + + return sock.Send(stream.GetArray(), stream.Position, SocketFlags.None); + } + } + + /// <summary> + /// Gets the socket. + /// </summary> + private static Socket GetSocket(int port) + { + var endPoint = new IPEndPoint(IPAddress.Loopback, port); + var sock = new Socket(endPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); + sock.Connect(endPoint); + return sock; + } + } +}
