Repository: ignite Updated Branches: refs/heads/ignite-5896 79fc39f7b -> 757f56a4e
IGNITE-5905 .NET: Thin client: cache.Get for primitives This closes #2542 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/757f56a4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/757f56a4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/757f56a4 Branch: refs/heads/ignite-5896 Commit: 757f56a4e20bbd280857b3aaea2e9dcc9df2dbcd Parents: 79fc39f Author: Pavel Tupitsyn <[email protected]> Authored: Wed Aug 30 18:58:57 2017 +0300 Committer: Pavel Tupitsyn <[email protected]> Committed: Wed Aug 30 18:58:57 2017 +0300 ---------------------------------------------------------------------- .../processors/odbc/SqlListenerNioListener.java | 6 +- .../Apache.Ignite.Core.Tests.csproj | 3 + .../Client/CacheTest.cs | 126 ++++ .../Client/ClientConnectionTest.cs | 120 ++++ .../Client/IgniteClientConfigurationTest.cs | 42 ++ .../IgniteConfigurationTest.cs | 2 + .../Apache.Ignite.Core.csproj | 6 + .../Client/IgniteClientConfiguration.cs | 92 +++ .../dotnet/Apache.Ignite.Core/Ignition.cs | 25 + .../Impl/Binary/BinaryUtils.cs | 16 +- .../Impl/Cache/CacheClient.cs | 639 +++++++++++++++++++ .../Apache.Ignite.Core/Impl/Client/ClientOp.cs | 27 + .../Impl/Client/ClientProtocolVersion.cs | 107 ++++ .../Impl/Client/ClientSocket.cs | 254 ++++++++ .../Impl/Client/IgniteClient.cs | 300 +++++++++ 15 files changed, 1757 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/757f56a4/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java index 3e8299c..4567cb8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java @@ -50,15 +50,15 @@ public class SqlListenerNioListener extends GridNioServerListenerAdapter<byte[]> /** JDBC driver handshake code. */ public static final byte JDBC_CLIENT = 1; + /** Thin client handshake code. */ + public static final byte THIN_CLIENT = 2; + /** Version 2.1.0. */ private static final SqlListenerProtocolVersion VER_2_1_0 = SqlListenerProtocolVersion.create(2, 1, 0); /** Version 2.1.5: added "lazy" flag. */ private static final SqlListenerProtocolVersion VER_2_1_5 = SqlListenerProtocolVersion.create(2, 1, 5); - /** Thin client handshake code. */ - public static final byte THIN_CLIENT = 2; - /** Current version. */ private static final SqlListenerProtocolVersion CURRENT_VER = VER_2_1_5; http://git-wip-us.apache.org/repos/asf/ignite/blob/757f56a4/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj index f704005..c9942ca 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj @@ -93,6 +93,9 @@ <Compile Include="Cache\Store\CacheStoreSessionTestCodeConfig.cs" /> <Compile Include="Cache\Store\CacheStoreSessionTestSharedFactory.cs" /> <Compile Include="Client\RawSocketTest.cs" /> + <Compile Include="Client\CacheTest.cs" /> + <Compile Include="Client\ClientConnectionTest.cs" /> + <Compile Include="Client\IgniteClientConfigurationTest.cs" /> <Compile Include="Deployment\CacheGetFunc.cs" /> <Compile Include="Deployment\GetAddressFunc.cs" /> <Compile Include="Deployment\PeerAssemblyLoadingAllApisTest.cs" /> http://git-wip-us.apache.org/repos/asf/ignite/blob/757f56a4/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/CacheTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/CacheTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/CacheTest.cs new file mode 100644 index 0000000..53cffd0 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/CacheTest.cs @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Tests.Client +{ + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.Linq; + using System.Threading; + using Apache.Ignite.Core.Cache; + using NUnit.Framework; + + /// <summary> + /// Thin client cache test. + /// </summary> + public class CacheTest + { + /** Cache name. */ + private const string CacheName = "cache"; + + /// <summary> + /// Fixture tear down. + /// </summary> + [TestFixtureSetUp] + public void FixtureSetUp() + { + Ignition.Start(TestUtils.GetTestConfiguration()); + } + + /// <summary> + /// Fixture tear down. + /// </summary> + [TestFixtureTearDown] + public void FixtureTearDown() + { + Ignition.StopAll(true); + } + + /// <summary> + /// Tests the cache put / get with primitive data types. + /// </summary> + [Test] + public void TestPutGetPrimitives() + { + using (var client = Ignition.GetClient()) + { + GetCache().Put(1, "foo"); + + var clientCache = client.GetCache<int, string>(CacheName); + + // Existing key. + Assert.AreEqual("foo", clientCache.Get(1)); + Assert.AreEqual("foo", clientCache[1]); + + // Missing key. + Assert.Throws<KeyNotFoundException>(() => clientCache.Get(2)); + } + } + + /// <summary> + /// Tests client get in multiple threads with a single client. + /// </summary> + [Test] + [Category(TestUtils.CategoryIntensive)] + public void TestGetMultithreadedSingleClient() + { + GetCache().Put(1, "foo"); + + using (var client = Ignition.GetClient()) + { + var clientCache = client.GetCache<int, string>(CacheName); + + TestUtils.RunMultiThreaded(() => Assert.AreEqual("foo", clientCache.Get(1)), + Environment.ProcessorCount, 5); + } + } + + /// <summary> + /// Tests client get in multiple threads with multiple clients. + /// </summary> + [Test] + [Category(TestUtils.CategoryIntensive)] + public void TestGetMultithreadedMultiClient() + { + GetCache().Put(1, "foo"); + + // One client per thread. + ConcurrentDictionary<int, IIgnite> clients = new ConcurrentDictionary<int, IIgnite>(); + + TestUtils.RunMultiThreaded(() => + { + var client = clients.GetOrAdd(Thread.CurrentThread.ManagedThreadId, _ => Ignition.GetClient()); + + var clientCache = client.GetCache<int, string>(CacheName); + + Assert.AreEqual("foo", clientCache.Get(1)); + }, + Environment.ProcessorCount, 5); + + clients.ToList().ForEach(x => x.Value.Dispose()); + } + + /// <summary> + /// Gets the cache. + /// </summary> + private static ICache<int, string> GetCache() + { + return Ignition.GetIgnite().GetOrCreateCache<int, string>(CacheName); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/757f56a4/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientConnectionTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientConnectionTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientConnectionTest.cs new file mode 100644 index 0000000..c6743b1 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientConnectionTest.cs @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Tests.Client +{ + using System; + using System.Linq; + using System.Net.Sockets; + using Apache.Ignite.Core.Client; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Configuration; + using Apache.Ignite.Core.Impl.Client; + using NUnit.Framework; + + /// <summary> + /// Tests client connection: port ranges, version checks, etc. + /// </summary> + public class ClientConnectionTest + { + /// <summary> + /// Fixture tear down. + /// </summary> + [TestFixtureTearDown] + public void FixtureTearDown() + { + Ignition.StopAll(true); + } + + /// <summary> + /// Tests that missing server yields connection refused error. + /// </summary> + [Test] + public void TestNoServerConnectionRefused() + { + var ex = Assert.Throws<AggregateException>(() => Ignition.GetClient()); + var socketEx = ex.InnerExceptions.OfType<SocketException>().First(); + Assert.AreEqual(SocketError.ConnectionRefused, socketEx.SocketErrorCode); + } + + /// <summary> + /// Tests that multiple clients can connect to one server. + /// </summary> + [Test] + public void TestMultipleClients() + { + using (Ignition.Start(TestUtils.GetTestConfiguration())) + { + var client1 = Ignition.GetClient(); + var client2 = Ignition.GetClient(); + var client3 = Ignition.GetClient(); + + client1.Dispose(); + client2.Dispose(); + client3.Dispose(); + } + } + + /// <summary> + /// Tests custom connector and client configuration. + /// </summary> + [Test] + [Category(TestUtils.CategoryIntensive)] + public void TestCustomConfig() + { + var servCfg = new IgniteConfiguration(TestUtils.GetTestConfiguration()) + { + SqlConnectorConfiguration = new SqlConnectorConfiguration + { + Host = "localhost", + Port = 2000, + PortRange = 1 + } + }; + + var clientCfg = new IgniteClientConfiguration + { + Host = "localhost", + Port = 2000 + }; + + using (Ignition.Start(servCfg)) + using (Ignition.GetClient(clientCfg)) + { + // No-op. + } + } + + /// <summary> + /// Tests the incorrect protocol version error. + /// </summary> + [Test] + [Category(TestUtils.CategoryIntensive)] + public void TestIncorrectProtocolVersionError() + { + using (Ignition.Start(TestUtils.GetTestConfiguration())) + { + // ReSharper disable once ObjectCreationAsStatement + var ex = Assert.Throws<IgniteException>(() => new ClientSocket(new IgniteClientConfiguration(), + new ClientProtocolVersion(-1, -1, -1))); + + Assert.AreEqual("Client handhsake failed: 'Unsupported version.'. " + + "Client version: -1.-1.-1. Server version: 2.1.5", ex.Message); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/757f56a4/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/IgniteClientConfigurationTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/IgniteClientConfigurationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/IgniteClientConfigurationTest.cs new file mode 100644 index 0000000..0734f42 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/IgniteClientConfigurationTest.cs @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Tests.Client +{ + using Apache.Ignite.Core.Client; + using NUnit.Framework; + + /// <summary> + /// Tests for <see cref="IgniteClientConfiguration"/>. + /// </summary> + public class IgniteClientConfigurationTest + { + /// <summary> + /// Tests the defaults. + /// </summary> + [Test] + public void TestDefaults() + { + var cfg = new IgniteClientConfiguration(); + + Assert.AreEqual(IgniteClientConfiguration.DefaultPort, cfg.Port); + Assert.AreEqual(IgniteClientConfiguration.DefaultSocketBufferSize, cfg.SocketReceiveBufferSize); + Assert.AreEqual(IgniteClientConfiguration.DefaultSocketBufferSize, cfg.SocketSendBufferSize); + Assert.AreEqual(IgniteClientConfiguration.DefaultTcpNoDelay, cfg.TcpNoDelay); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/757f56a4/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs index 93d6af3..950f36d 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs @@ -26,6 +26,7 @@ namespace Apache.Ignite.Core.Tests using Apache.Ignite.Core.Cache.Affinity.Rendezvous; using Apache.Ignite.Core.Cache.Configuration; using Apache.Ignite.Core.Cache.Eviction; + using Apache.Ignite.Core.Client; using Apache.Ignite.Core.Common; using Apache.Ignite.Core.Communication.Tcp; using Apache.Ignite.Core.Configuration; @@ -87,6 +88,7 @@ namespace Apache.Ignite.Core.Tests CheckDefaultValueAttributes(new MemoryPolicyConfiguration()); CheckDefaultValueAttributes(new SqlConnectorConfiguration()); CheckDefaultValueAttributes(new PersistentStoreConfiguration()); + CheckDefaultValueAttributes(new IgniteClientConfiguration()); } /// <summary> http://git-wip-us.apache.org/repos/asf/ignite/blob/757f56a4/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj index c444ed0..8a384fd 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj @@ -97,8 +97,14 @@ <Compile Include="Cache\Configuration\MemoryPolicyConfiguration.cs" /> <Compile Include="Cache\Configuration\PartitionLossPolicy.cs" /> <Compile Include="Cache\IMemoryMetrics.cs" /> + <Compile Include="Client\IgniteClientConfiguration.cs" /> <Compile Include="Common\ExceptionFactory.cs" /> <Compile Include="Configuration\Package-Info.cs" /> + <Compile Include="Impl\Cache\CacheClient.cs" /> + <Compile Include="Impl\Client\ClientOp.cs" /> + <Compile Include="Impl\Client\ClientProtocolVersion.cs" /> + <Compile Include="Impl\Client\ClientSocket.cs" /> + <Compile Include="Impl\Client\IgniteClient.cs" /> <Compile Include="Impl\IPlatformTargetInternal.cs" /> <Compile Include="Impl\PersistentStore\PersistentStoreMetrics.cs" /> <Compile Include="Impl\PlatformDisposableTargetAdapter.cs" /> http://git-wip-us.apache.org/repos/asf/ignite/blob/757f56a4/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs new file mode 100644 index 0000000..0cd9be2 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Client +{ + using System.ComponentModel; + + /// <summary> + /// Ignite thin client configuration. + /// <para /> + /// Ignite thin client connects to a specific Ignite node with a socket and does not start JVM in process. + /// This configuration should correspond to <see cref="IgniteConfiguration.SqlConnectorConfiguration"/> + /// on a target node. + /// </summary> + public class IgniteClientConfiguration + { + /// <summary> + /// Default port. + /// </summary> + public const int DefaultPort = 10800; + + /// <summary> + /// Default socket buffer size. + /// </summary> + public const int DefaultSocketBufferSize = 0; + + /// <summary> + /// Default value of <see cref="TcpNoDelay" /> property. + /// </summary> + public const bool DefaultTcpNoDelay = true; + + /// <summary> + /// Initializes a new instance of the <see cref="IgniteClientConfiguration"/> class. + /// </summary> + public IgniteClientConfiguration() + { + Port = DefaultPort; + SocketSendBufferSize = DefaultSocketBufferSize; + SocketReceiveBufferSize = DefaultSocketBufferSize; + TcpNoDelay = DefaultTcpNoDelay; + } + + /// <summary> + /// Gets or sets the host. Null for loopback. + /// </summary> + public string Host { get; set; } + + /// <summary> + /// Gets or sets the port. + /// </summary> + [DefaultValue(DefaultPort)] + public int Port { get; set; } + + /// <summary> + /// Gets or sets the size of the socket send buffer. When set to 0, operating system default is used. + /// </summary> + [DefaultValue(DefaultSocketBufferSize)] + public int SocketSendBufferSize { get; set; } + + /// <summary> + /// Gets or sets the size of the socket receive buffer. When set to 0, operating system default is used. + /// </summary> + [DefaultValue(DefaultSocketBufferSize)] + public int SocketReceiveBufferSize { get; set; } + + /// <summary> + /// Gets or sets the value for <c>TCP_NODELAY</c> socket option. Each + /// socket will be opened using provided value. + /// <para /> + /// Setting this option to <c>true</c> disables Nagle's algorithm + /// for socket decreasing latency and delivery time for small messages. + /// <para /> + /// For systems that work under heavy network load it is advisable to set this value to <c>false</c>. + /// </summary> + [DefaultValue(DefaultTcpNoDelay)] + public bool TcpNoDelay { get; set; } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/757f56a4/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs index 568eea7..9ee7c26 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs @@ -28,11 +28,13 @@ namespace Apache.Ignite.Core using System.Threading; using Apache.Ignite.Core.Binary; using Apache.Ignite.Core.Cache.Affinity; + using Apache.Ignite.Core.Client; using Apache.Ignite.Core.Common; using Apache.Ignite.Core.Impl; using Apache.Ignite.Core.Impl.Binary; using Apache.Ignite.Core.Impl.Binary.IO; using Apache.Ignite.Core.Impl.Cache.Affinity; + using Apache.Ignite.Core.Impl.Client; using Apache.Ignite.Core.Impl.Common; using Apache.Ignite.Core.Impl.Handle; using Apache.Ignite.Core.Impl.Log; @@ -730,6 +732,29 @@ namespace Apache.Ignite.Core } /// <summary> + /// Connects Ignite lightweight (thin) client to a local Ignite node. + /// <para /> + /// Thin client connects to an existing Ignite node with a socket and does not start JVM in process. + /// </summary> + /// <returns>Ignite instance.</returns> + public static IIgnite GetClient() + { + return new IgniteClient(new IgniteClientConfiguration()); + } + + /// <summary> + /// Connects Ignite lightweight (thin) client to an Ignite node. + /// <para /> + /// Thin client connects to an existing Ignite node with a socket and does not start JVM in process. + /// </summary> + /// <param name="clientConfiguration">The client configuration.</param> + /// <returns>Ignite instance.</returns> + public static IIgnite GetClient(IgniteClientConfiguration clientConfiguration) + { + return new IgniteClient(clientConfiguration); + } + + /// <summary> /// Handles the DomainUnload event of the CurrentDomain control. /// </summary> /// <param name="sender">The source of the event.</param> http://git-wip-us.apache.org/repos/asf/ignite/blob/757f56a4/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs index 91a536e..412a3cc 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs @@ -1471,11 +1471,9 @@ namespace Apache.Ignite.Core.Impl.Binary return res; } - /** - * <summary>Get string hash code.</summary> - * <param name="val">Value.</param> - * <returns>Hash code.</returns> - */ + /// <summary> + /// Gets the string hash code using Java algorithm. + /// </summary> public static int GetStringHashCode(string val) { if (val == null) @@ -1494,6 +1492,14 @@ namespace Apache.Ignite.Core.Impl.Binary } /// <summary> + /// Gets the cache identifier. + /// </summary> + public static int GetCacheId(string cacheName) + { + return string.IsNullOrEmpty(cacheName) ? 1 : GetStringHashCode(cacheName); + } + + /// <summary> /// Cleans the name of the field. /// </summary> public static string CleanFieldName(string fieldName) http://git-wip-us.apache.org/repos/asf/ignite/blob/757f56a4/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheClient.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheClient.cs new file mode 100644 index 0000000..b8bf95e --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheClient.cs @@ -0,0 +1,639 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache +{ + using System; + using System.Collections; + using System.Collections.Generic; + using System.Diagnostics; + using System.IO; + using System.Threading.Tasks; + using Apache.Ignite.Core.Binary; + using Apache.Ignite.Core.Cache; + using Apache.Ignite.Core.Cache.Configuration; + using Apache.Ignite.Core.Cache.Expiry; + using Apache.Ignite.Core.Cache.Query; + using Apache.Ignite.Core.Cache.Query.Continuous; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Impl.Binary; + using Apache.Ignite.Core.Impl.Binary.IO; + using Apache.Ignite.Core.Impl.Client; + using Apache.Ignite.Core.Impl.Common; + + /// <summary> + /// Client cache implementation. + /// </summary> + internal class CacheClient<TK, TV> : ICache<TK, TV> + { + /** Socket. */ + private readonly ClientSocket _socket; + + /** Cache name. */ + private readonly string _name; + + /** Cache id. */ + private readonly int _id; + + /** Marshaller */ + private readonly Marshaller _marsh = BinaryUtils.Marshaller; + + /// <summary> + /// Initializes a new instance of the <see cref="CacheClient{TK, TV}" /> class. + /// </summary> + /// <param name="socket">The socket.</param> + /// <param name="name">Cache name.</param> + public CacheClient(ClientSocket socket, string name) + { + Debug.Assert(socket != null); + Debug.Assert(name != null); + + _socket = socket; + _name = name; + _id = BinaryUtils.GetCacheId(name); + } + + /** <inheritDoc /> */ + public IEnumerator<ICacheEntry<TK, TV>> GetEnumerator() + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + IEnumerator IEnumerable.GetEnumerator() + { + return GetEnumerator(); + } + + /** <inheritDoc /> */ + public string Name + { + get { return _name; } + } + + /** <inheritDoc /> */ + public IIgnite Ignite + { + get { throw new NotImplementedException(); } + } + + /** <inheritDoc /> */ + public CacheConfiguration GetConfiguration() + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public bool IsEmpty() + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public bool IsKeepBinary + { + get { throw new NotImplementedException(); } + } + + /** <inheritDoc /> */ + public ICache<TK, TV> WithSkipStore() + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public ICache<TK, TV> WithExpiryPolicy(IExpiryPolicy plc) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public ICache<TK1, TV1> WithKeepBinary<TK1, TV1>() + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public void LoadCache(ICacheEntryFilter<TK, TV> p, params object[] args) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public Task LoadCacheAsync(ICacheEntryFilter<TK, TV> p, params object[] args) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public void LocalLoadCache(ICacheEntryFilter<TK, TV> p, params object[] args) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public Task LocalLoadCacheAsync(ICacheEntryFilter<TK, TV> p, params object[] args) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public void LoadAll(IEnumerable<TK> keys, bool replaceExistingValues) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public Task LoadAllAsync(IEnumerable<TK> keys, bool replaceExistingValues) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public bool ContainsKey(TK key) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public Task<bool> ContainsKeyAsync(TK key) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public bool ContainsKeys(IEnumerable<TK> keys) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public Task<bool> ContainsKeysAsync(IEnumerable<TK> keys) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public TV LocalPeek(TK key, params CachePeekMode[] modes) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public bool TryLocalPeek(TK key, out TV value, params CachePeekMode[] modes) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public TV this[TK key] + { + get + { + return Get(key); + } + set + { + Put(key, value); + } + } + + /** <inheritDoc /> */ + public TV Get(TK key) + { + return DoOutInOp(ClientOp.CacheGet, w => w.WriteObject(key), UnmarshalNotNull<TV>); + } + + /** <inheritDoc /> */ + public Task<TV> GetAsync(TK key) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public bool TryGet(TK key, out TV value) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public Task<CacheResult<TV>> TryGetAsync(TK key) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public ICollection<ICacheEntry<TK, TV>> GetAll(IEnumerable<TK> keys) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public Task<ICollection<ICacheEntry<TK, TV>>> GetAllAsync(IEnumerable<TK> keys) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public void Put(TK key, TV val) + { + IgniteArgumentCheck.NotNull(key, "key"); + IgniteArgumentCheck.NotNull(val, "val"); + + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public Task PutAsync(TK key, TV val) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public CacheResult<TV> GetAndPut(TK key, TV val) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public Task<CacheResult<TV>> GetAndPutAsync(TK key, TV val) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public CacheResult<TV> GetAndReplace(TK key, TV val) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public Task<CacheResult<TV>> GetAndReplaceAsync(TK key, TV val) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public CacheResult<TV> GetAndRemove(TK key) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public Task<CacheResult<TV>> GetAndRemoveAsync(TK key) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public bool PutIfAbsent(TK key, TV val) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public Task<bool> PutIfAbsentAsync(TK key, TV val) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public CacheResult<TV> GetAndPutIfAbsent(TK key, TV val) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public Task<CacheResult<TV>> GetAndPutIfAbsentAsync(TK key, TV val) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public bool Replace(TK key, TV val) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public Task<bool> ReplaceAsync(TK key, TV val) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public bool Replace(TK key, TV oldVal, TV newVal) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public Task<bool> ReplaceAsync(TK key, TV oldVal, TV newVal) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public void PutAll(IEnumerable<KeyValuePair<TK, TV>> vals) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public Task PutAllAsync(IEnumerable<KeyValuePair<TK, TV>> vals) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public void LocalEvict(IEnumerable<TK> keys) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public void Clear() + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public Task ClearAsync() + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public void Clear(TK key) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public Task ClearAsync(TK key) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public void ClearAll(IEnumerable<TK> keys) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public Task ClearAllAsync(IEnumerable<TK> keys) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public void LocalClear(TK key) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public void LocalClearAll(IEnumerable<TK> keys) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public bool Remove(TK key) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public Task<bool> RemoveAsync(TK key) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public bool Remove(TK key, TV val) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public Task<bool> RemoveAsync(TK key, TV val) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public void RemoveAll(IEnumerable<TK> keys) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public Task RemoveAllAsync(IEnumerable<TK> keys) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public void RemoveAll() + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public Task RemoveAllAsync() + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public int GetLocalSize(params CachePeekMode[] modes) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public int GetSize(params CachePeekMode[] modes) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public Task<int> GetSizeAsync(params CachePeekMode[] modes) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public IQueryCursor<ICacheEntry<TK, TV>> Query(QueryBase qry) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public IQueryCursor<IList> QueryFields(SqlFieldsQuery qry) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public IContinuousQueryHandle QueryContinuous(ContinuousQuery<TK, TV> qry) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public IContinuousQueryHandle<ICacheEntry<TK, TV>> QueryContinuous(ContinuousQuery<TK, TV> qry, QueryBase initialQry) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public IEnumerable<ICacheEntry<TK, TV>> GetLocalEntries(params CachePeekMode[] peekModes) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public TRes Invoke<TArg, TRes>(TK key, ICacheEntryProcessor<TK, TV, TArg, TRes> processor, TArg arg) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public Task<TRes> InvokeAsync<TArg, TRes>(TK key, ICacheEntryProcessor<TK, TV, TArg, TRes> processor, TArg arg) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public ICollection<ICacheEntryProcessorResult<TK, TRes>> InvokeAll<TArg, TRes>(IEnumerable<TK> keys, ICacheEntryProcessor<TK, TV, TArg, TRes> processor, TArg arg) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public Task<ICollection<ICacheEntryProcessorResult<TK, TRes>>> InvokeAllAsync<TArg, TRes>(IEnumerable<TK> keys, ICacheEntryProcessor<TK, TV, TArg, TRes> processor, TArg arg) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public ICacheLock Lock(TK key) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public ICacheLock LockAll(IEnumerable<TK> keys) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public bool IsLocalLocked(TK key, bool byCurrentThread) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public ICacheMetrics GetMetrics() + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public ICacheMetrics GetMetrics(IClusterGroup clusterGroup) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public ICacheMetrics GetLocalMetrics() + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public Task Rebalance() + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public ICache<TK, TV> WithNoRetries() + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public ICache<TK, TV> WithPartitionRecover() + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public ICollection<int> GetLostPartitions() + { + throw new NotImplementedException(); + } + + /// <summary> + /// Does the out in op. + /// </summary> + private T DoOutInOp<T>(ClientOp opId, Action<IBinaryRawWriter> writeAction, + Func<IBinaryStream, T> readFunc) + { + return _socket.DoOutInOp(opId, stream => + { + stream.WriteInt(_id); + stream.WriteByte(0); // Flags (skipStore, etc). + + if (writeAction != null) + { + writeAction(_marsh.StartMarshal(stream)); + } + }, readFunc); + } + + /// <summary> + /// Unmarshals the value, throwing an exception for nulls. + /// </summary> + private T UnmarshalNotNull<T>(IBinaryStream stream) + { + var hdr = stream.ReadByte(); + + if (hdr == BinaryUtils.HdrNull) + { + throw GetKeyNotFoundException(); + } + + stream.Seek(-1, SeekOrigin.Current); + + return _marsh.Unmarshal<T>(stream); + } + + /// <summary> + /// Gets the key not found exception. + /// </summary> + private static KeyNotFoundException GetKeyNotFoundException() + { + return new KeyNotFoundException("The given key was not present in the cache."); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/757f56a4/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientOp.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientOp.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientOp.cs new file mode 100644 index 0000000..0039085 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientOp.cs @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Client +{ + /// <summary> + /// Client op code. + /// </summary> + internal enum ClientOp : short + { + CacheGet = 1 + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/757f56a4/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientProtocolVersion.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientProtocolVersion.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientProtocolVersion.cs new file mode 100644 index 0000000..d06b97d --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientProtocolVersion.cs @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Client +{ + using System; + + /// <summary> + /// Client protocol version. + /// </summary> + internal struct ClientProtocolVersion : IEquatable<ClientProtocolVersion> + { + /** */ + private readonly short _major; + + /** */ + private readonly short _minor; + + /** */ + private readonly short _maintenance; + + /// <summary> + /// Initializes a new instance of the <see cref="ClientProtocolVersion"/> struct. + /// </summary> + public ClientProtocolVersion(short major, short minor, short maintenance) + { + _major = major; + _minor = minor; + _maintenance = maintenance; + } + + /// <summary> + /// Gets the major part. + /// </summary> + public short Major + { + get { return _major; } + } + + /// <summary> + /// Gets the minor part. + /// </summary> + public short Minor + { + get { return _minor; } + } + + /// <summary> + /// Gets the maintenance part. + /// </summary> + public short Maintenance + { + get { return _maintenance; } + } + + /// <summary> + /// Returns a value indicating whether specified instance equals to current. + /// </summary> + public bool Equals(ClientProtocolVersion other) + { + return _major == other._major && _minor == other._minor && _maintenance == other._maintenance; + } + + /** <inheritdoc /> */ + public override bool Equals(object obj) + { + if (ReferenceEquals(null, obj)) + { + return false; + } + + return obj is ClientProtocolVersion && Equals((ClientProtocolVersion) obj); + } + + /** <inheritdoc /> */ + public override int GetHashCode() + { + unchecked + { + var hashCode = _major.GetHashCode(); + hashCode = (hashCode * 397) ^ _minor.GetHashCode(); + hashCode = (hashCode * 397) ^ _maintenance.GetHashCode(); + return hashCode; + } + } + + /** <inheritdoc /> */ + public override string ToString() + { + return string.Format("{0}.{1}.{2}", Major, Minor, Maintenance); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/757f56a4/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs new file mode 100644 index 0000000..886e454 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Client +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Net; + using System.Net.Sockets; + using System.Threading; + using Apache.Ignite.Core.Client; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Impl.Binary; + using Apache.Ignite.Core.Impl.Binary.IO; + + /// <summary> + /// Wrapper over framework socket for Ignite thin client operations. + /// </summary> + internal class ClientSocket : IDisposable + { + /** Current version. */ + private static readonly ClientProtocolVersion CurrentProtocolVersion = new ClientProtocolVersion(2, 1, 5); + + /** Handshake opcode. */ + private const byte OpHandshake = 1; + + /** Client type code. */ + private const byte ClientType = 2; + + /** Unerlying socket. */ + private readonly Socket _socket; + + /** */ + private int _requestId; + + /// <summary> + /// Initializes a new instance of the <see cref="ClientSocket" /> class. + /// </summary> + /// <param name="clientConfiguration">The client configuration.</param> + /// <param name="version">Protocol version.</param> + public ClientSocket(IgniteClientConfiguration clientConfiguration, ClientProtocolVersion? version = null) + { + Debug.Assert(clientConfiguration != null); + + _socket = Connect(clientConfiguration); + + Handshake(_socket, version ?? CurrentProtocolVersion); + } + + /// <summary> + /// Performs a send-receive operation. + /// </summary> + public T DoOutInOp<T>(ClientOp opId, Action<IBinaryStream> writeAction, + Func<IBinaryStream, T> readFunc) + { + var requestId = Interlocked.Increment(ref _requestId); + + var resBytes = SendReceive(_socket, stream => + { + stream.WriteShort((short) opId); + stream.WriteByte(0); // Flags (compression, etc) + stream.WriteInt(requestId); + + if (writeAction != null) + { + writeAction(stream); + } + }); + + using (var stream = new BinaryHeapStream(resBytes)) + { + var resRequestId = stream.ReadInt(); + Debug.Assert(requestId == resRequestId); + + stream.ReadByte(); // Flags + + if (readFunc != null) + { + return readFunc(stream); + } + } + + return default(T); + } + + /// <summary> + /// Performs client protocol handshake. + /// </summary> + private static void Handshake(Socket sock, ClientProtocolVersion version) + { + var res = SendReceive(sock, stream => + { + // Handshake. + stream.WriteByte(OpHandshake); + + // Protocol version. + stream.WriteShort(version.Major); + stream.WriteShort(version.Minor); + stream.WriteShort(version.Maintenance); + + // Client type: platform. + stream.WriteByte(ClientType); + }, 20); + + using (var stream = new BinaryHeapStream(res)) + { + var success = stream.ReadBool(); + + if (success) + { + return; + } + + var serverVersion = + new ClientProtocolVersion(stream.ReadShort(), stream.ReadShort(), stream.ReadShort()); + + var errMsg = BinaryUtils.Marshaller.Unmarshal<string>(stream); + + throw new IgniteException(string.Format( + "Client handhsake failed: '{0}'. Client version: {1}. Server version: {2}", + errMsg, version, serverVersion)); + } + } + + /// <summary> + /// Sends the request and receives a response. + /// </summary> + private static byte[] SendReceive(Socket sock, Action<IBinaryStream> writeAction, int bufSize = 128) + { + int messageLen; + var buf = WriteMessage(writeAction, bufSize, out messageLen); + + lock (sock) + { + var sent = sock.Send(buf, messageLen, SocketFlags.None); + Debug.Assert(sent == messageLen); + + buf = new byte[4]; + var received = sock.Receive(buf); + Debug.Assert(received == buf.Length); + + using (var stream = new BinaryHeapStream(buf)) + { + var size = stream.ReadInt(); + + buf = new byte[size]; + received = sock.Receive(buf); + Debug.Assert(received == buf.Length); + + return buf; + } + } + } + + /// <summary> + /// Writes the message to a byte array. + /// </summary> + private static byte[] WriteMessage(Action<IBinaryStream> writeAction, int bufSize, out int messageLen) + { + using (var stream = new BinaryHeapStream(bufSize)) + { + stream.WriteInt(0); // Reserve message size. + + writeAction(stream); + + stream.WriteInt(0, stream.Position - 4); // Write message size. + + messageLen = stream.Position; + + return stream.GetArray(); + } + } + + /// <summary> + /// Connects the socket. + /// </summary> + private static Socket Connect(IgniteClientConfiguration cfg) + { + List<Exception> errors = null; + + foreach (var ipEndPoint in GetEndPoints(cfg)) + { + try + { + var socket = new Socket(ipEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp) + { + SendBufferSize = cfg.SocketSendBufferSize, + ReceiveBufferSize = cfg.SocketReceiveBufferSize, + NoDelay = cfg.TcpNoDelay + }; + + socket.Connect(ipEndPoint); + + return socket; + } + catch (SocketException e) + { + if (errors == null) + { + errors = new List<Exception>(); + } + + errors.Add(e); + } + } + + if (errors == null) + { + throw new IgniteException("Failed to resolve client host: " + cfg.Host); + } + + throw new AggregateException("Failed to establish Ignite thin client connection, " + + "examine inner exceptions for details.", errors); + } + + /// <summary> + /// Gets the endpoints: all combinations of IP addresses and ports according to configuration. + /// </summary> + private static IEnumerable<IPEndPoint> GetEndPoints(IgniteClientConfiguration cfg) + { + var addressList = cfg.Host != null + ? Dns.GetHostEntry(cfg.Host).AddressList + : new[] { IPAddress.Loopback }; + + foreach (var ipAddress in addressList) + { + yield return new IPEndPoint(ipAddress, cfg.Port); + } + } + + /// <summary> + /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. + /// </summary> + public void Dispose() + { + _socket.Dispose(); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/757f56a4/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs new file mode 100644 index 0000000..4afcdee --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Client +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using Apache.Ignite.Core.Binary; + using Apache.Ignite.Core.Cache; + using Apache.Ignite.Core.Cache.Configuration; + using Apache.Ignite.Core.Client; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Compute; + using Apache.Ignite.Core.Datastream; + using Apache.Ignite.Core.DataStructures; + using Apache.Ignite.Core.Events; + using Apache.Ignite.Core.Impl.Cache; + using Apache.Ignite.Core.Impl.Common; + using Apache.Ignite.Core.Lifecycle; + using Apache.Ignite.Core.Log; + using Apache.Ignite.Core.Messaging; + using Apache.Ignite.Core.PersistentStore; + using Apache.Ignite.Core.Services; + using Apache.Ignite.Core.Transactions; + + /// <summary> + /// Thin client implementation + /// </summary> + internal class IgniteClient : IIgnite + { + /** Socket. */ + private readonly ClientSocket _socket; + + /// <summary> + /// Initializes a new instance of the <see cref="IgniteClient"/> class. + /// </summary> + /// <param name="clientConfiguration">The client configuration.</param> + public IgniteClient(IgniteClientConfiguration clientConfiguration) + { + Debug.Assert(clientConfiguration != null); + + _socket = new ClientSocket(clientConfiguration); + } + + /** <inheritDoc /> */ + public void Dispose() + { + _socket.Dispose(); + } + + /** <inheritDoc /> */ + public string Name + { + get { throw new NotImplementedException(); } + } + + /** <inheritDoc /> */ + public ICluster GetCluster() + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public ICompute GetCompute() + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public ICache<TK, TV> GetCache<TK, TV>(string name) + { + IgniteArgumentCheck.NotNull(name, "name"); + + return new CacheClient<TK, TV>(_socket, name); + } + + /** <inheritDoc /> */ + public ICache<TK, TV> GetOrCreateCache<TK, TV>(string name) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public ICache<TK, TV> GetOrCreateCache<TK, TV>(CacheConfiguration configuration) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public ICache<TK, TV> GetOrCreateCache<TK, TV>(CacheConfiguration configuration, NearCacheConfiguration nearConfiguration) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public ICache<TK, TV> CreateCache<TK, TV>(string name) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public ICache<TK, TV> CreateCache<TK, TV>(CacheConfiguration configuration) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public ICache<TK, TV> CreateCache<TK, TV>(CacheConfiguration configuration, NearCacheConfiguration nearConfiguration) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public void DestroyCache(string name) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public IDataStreamer<TK, TV> GetDataStreamer<TK, TV>(string cacheName) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public IBinary GetBinary() + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public ICacheAffinity GetAffinity(string name) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public ITransactions GetTransactions() + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public IMessaging GetMessaging() + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public IEvents GetEvents() + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public IServices GetServices() + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public IAtomicLong GetAtomicLong(string name, long initialValue, bool create) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public IAtomicSequence GetAtomicSequence(string name, long initialValue, bool create) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public IAtomicReference<T> GetAtomicReference<T>(string name, T initialValue, bool create) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public IgniteConfiguration GetConfiguration() + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public ICache<TK, TV> CreateNearCache<TK, TV>(string name, NearCacheConfiguration configuration) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public ICache<TK, TV> GetOrCreateNearCache<TK, TV>(string name, NearCacheConfiguration configuration) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public ICollection<string> GetCacheNames() + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public ILogger Logger + { + get { throw new NotImplementedException(); } + } + + /** <inheritDoc /> */ + public event EventHandler Stopping + { + add { throw new NotImplementedException(); } + remove { throw new NotImplementedException(); } + } + + /** <inheritDoc /> */ + public event EventHandler Stopped + { + add { throw new NotImplementedException(); } + remove { throw new NotImplementedException(); } + } + + /** <inheritDoc /> */ + public event EventHandler ClientDisconnected + { + add { throw new NotImplementedException(); } + remove { throw new NotImplementedException(); } + } + + /** <inheritDoc /> */ + public event EventHandler<ClientReconnectEventArgs> ClientReconnected + { + add { throw new NotImplementedException(); } + remove { throw new NotImplementedException(); } + } + + /** <inheritDoc /> */ + public T GetPlugin<T>(string name) where T : class + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public void ResetLostPartitions(IEnumerable<string> cacheNames) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public void ResetLostPartitions(params string[] cacheNames) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public ICollection<IMemoryMetrics> GetMemoryMetrics() + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public IMemoryMetrics GetMemoryMetrics(string memoryPolicyName) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public void SetActive(bool isActive) + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public bool IsActive() + { + throw new NotImplementedException(); + } + + /** <inheritDoc /> */ + public IPersistentStoreMetrics GetPersistentStoreMetrics() + { + throw new NotImplementedException(); + } + } +}
