This is an automated email from the ASF dual-hosted git repository. ptupitsyn pushed a commit to branch ignite-12471-2.8 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 0eb929bf4f7c5f3d69251b8635b62c69b471b7c5 Author: Pavel <[email protected]> AuthorDate: Sat Jan 4 17:31:12 2020 +0300 IGNITE-12471 .NET Thin Client: Fix WithExpiryPolicy crash Old servers do not support the extra flag, which causes exception and disconnect. * Refactor client operation handling to be able to check `ProtocolVersion` of the exact connection that is used for the given operation * Add `ProtocolVersion` check for cache operations with expiry policy. * Fix `ProtocolVersion` check for `CacheConfiguration` --- .../Client/Cache/CreateCacheTest.cs | 4 +- .../Client/ClientConnectionTest.cs | 23 ++ .../Client/ClientProtocolCompatibilityTest.cs | 8 +- .../Client/ClientReconnectCompatibilityTest.cs | 2 +- .../Client/ClientServerCompatibilityTest.cs | 164 +++++++++++++-- .../dotnet/Apache.Ignite.Core.Tests/JavaServer.cs | 53 +++-- .../Apache.Ignite.Core.Tests/JavaServer/pom.xml | 2 +- .../JavaServer/src/main/java/Runner.java | 20 +- .../Apache.Ignite.Core.Tests/ProcessExtensions.cs | 1 + .../Apache.Ignite.Core/Apache.Ignite.Core.csproj | 5 +- .../Client/IgniteClientConfiguration.cs | 4 +- .../Impl/Binary/BinaryProcessorClient.cs | 31 ++- .../Impl/Client/Cache/CacheClient.cs | 232 ++++++++++----------- .../Client/Cache/Query/ClientQueryCursorBase.cs | 5 +- .../Impl/Client/ClientContextBase.cs | 79 +++++++ .../Impl/Client/ClientFailoverSocket.cs | 59 +++--- .../{IClientSocket.cs => ClientRequestContext.cs} | 51 +++-- .../Impl/Client/ClientResponseContext.cs | 51 +++++ .../Apache.Ignite.Core/Impl/Client/ClientSocket.cs | 62 +++--- .../Apache.Ignite.Core/Impl/Client/ClientUtils.cs | 58 ++++++ .../Impl/Client/Cluster/ClientCluster.cs | 58 ++---- .../Apache.Ignite.Core/Impl/Client/IgniteClient.cs | 41 +--- modules/platforms/dotnet/DEVNOTES.txt | 3 +- modules/platforms/dotnet/build.ps1 | 2 +- modules/platforms/dotnet/release/Program.cs | 11 +- 25 files changed, 697 insertions(+), 332 deletions(-) diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CreateCacheTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CreateCacheTest.cs index a86f2ea..a9f2825 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CreateCacheTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CreateCacheTest.cs @@ -195,9 +195,9 @@ namespace Apache.Ignite.Core.Tests.Client.Cache var client = (IgniteClient) Client; // Create cache directly through a socket with only some config properties provided. - client.Socket.DoOutInOp<object>(ClientOp.CacheCreateWithConfiguration, s => + client.Socket.DoOutInOp<object>(ClientOp.CacheCreateWithConfiguration, ctx => { - var w = client.Marshaller.StartMarshal(s); + var w = ctx.Writer; w.WriteInt(2 + 2 + 6 + 2 + 4); // config length in bytes. diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientConnectionTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientConnectionTest.cs index 8568423..ec90b6f 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientConnectionTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientConnectionTest.cs @@ -236,10 +236,33 @@ namespace Apache.Ignite.Core.Tests.Client { Assert.AreEqual("foo", client.GetCacheNames().Single()); } + + // Port range. + cfg = new IgniteClientConfiguration("127.0.0.1:10798..10800"); + + using (var client = Ignition.StartClient(cfg)) + { + Assert.AreEqual("foo", client.GetCacheNames().Single()); + } } } /// <summary> + /// Tests that empty port range causes an exception. + /// </summary> + [Test] + public void TestEmptyPortRangeThrows() + { + var cfg = new IgniteClientConfiguration("127.0.0.1:10800..10700"); + + var ex = Assert.Throws<IgniteClientException>(() => Ignition.StartClient(cfg)); + + Assert.AreEqual( + "Invalid format of IgniteClientConfiguration.Endpoint, port range is empty: 127.0.0.1:10800..10700", + ex.Message); + } + + /// <summary> /// Tests that default configuration throws. /// </summary> [Test] diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientProtocolCompatibilityTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientProtocolCompatibilityTest.cs index 4e2b747..ab8219f 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientProtocolCompatibilityTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientProtocolCompatibilityTest.cs @@ -104,7 +104,7 @@ namespace Apache.Ignite.Core.Tests.Client using (var client = GetClient(version)) { - Assert.AreEqual(ClientSocket.CurrentProtocolVersion, client.ServerVersion); + Assert.AreEqual(ClientSocket.CurrentProtocolVersion, client.Socket.CurrentProtocolVersion); var logs = GetLogs(client); @@ -129,7 +129,7 @@ namespace Apache.Ignite.Core.Tests.Client using (var client = GetClient(version)) { - Assert.AreEqual(version, client.ServerVersion); + Assert.AreEqual(version, client.Socket.CurrentProtocolVersion); var lastLog = GetLogs(client).Last(); var expectedLog = string.Format( @@ -140,7 +140,7 @@ namespace Apache.Ignite.Core.Tests.Client Assert.AreEqual(typeof(ClientSocket).Name, lastLog.Category); } } - + /// <summary> /// Asserts correct exception for cluster operations. /// </summary> @@ -159,7 +159,7 @@ namespace Apache.Ignite.Core.Tests.Client /// <summary> /// Asserts proper exception for non-supported operation. /// </summary> - private static void AssertNotSupportedOperation(Action action, string version, + public static void AssertNotSupportedOperation(Action action, string version, string expectedOperationName) { var ex = Assert.Throws<IgniteClientException>(() => action()); diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientReconnectCompatibilityTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientReconnectCompatibilityTest.cs index 40580de..f93a621 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientReconnectCompatibilityTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientReconnectCompatibilityTest.cs @@ -99,7 +99,7 @@ namespace Apache.Ignite.Core.Tests.Client /// </summary> private static IDisposable StartOldServer() { - return JavaServer.Start("2.4.0"); + return JavaServer.Start(JavaServer.GroupIdIgnite, "2.4.0"); } } } \ No newline at end of file diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientServerCompatibilityTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientServerCompatibilityTest.cs index 0619ed2..05bf804 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientServerCompatibilityTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientServerCompatibilityTest.cs @@ -18,7 +18,14 @@ namespace Apache.Ignite.Core.Tests.Client { using System; + using System.Threading; + using Apache.Ignite.Core.Cache.Configuration; + using Apache.Ignite.Core.Cache.Expiry; using Apache.Ignite.Core.Client; + using Apache.Ignite.Core.Client.Cache; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Configuration; + using Apache.Ignite.Core.Impl.Client; using Apache.Ignite.Core.Log; using Apache.Ignite.Core.Tests.Client.Cache; using NUnit.Framework; @@ -28,20 +35,23 @@ namespace Apache.Ignite.Core.Tests.Client /// Differs from <see cref="ClientProtocolCompatibilityTest"/>: /// here we actually download and run old Ignite versions instead of changing the protocol version in handshake. /// </summary> - [TestFixture("2.4.0", "1.0.0")] - [TestFixture("2.5.0", "1.1.0")] - [TestFixture("2.6.0", "1.1.0")] - [TestFixture("2.7.0", "1.2.0")] - [TestFixture("2.7.5", "1.2.0")] - [TestFixture("2.7.6", "1.2.0")] + [TestFixture(JavaServer.GroupIdIgnite, "2.4.0", 0)] + [TestFixture(JavaServer.GroupIdIgnite, "2.5.0", 1)] + [TestFixture(JavaServer.GroupIdIgnite, "2.6.0", 1)] + [TestFixture(JavaServer.GroupIdIgnite, "2.7.0", 2)] + [TestFixture(JavaServer.GroupIdIgnite, "2.7.5", 2)] + [TestFixture(JavaServer.GroupIdIgnite, "2.7.6", 2)] [Category(TestUtils.CategoryIntensive)] public class ClientServerCompatibilityTest { /** */ - private readonly string _igniteVersion; + private readonly string _groupId; /** */ - private readonly string _clientProtocolVersion; + private readonly string _serverVersion; + + /** */ + private readonly ClientProtocolVersion _clientProtocolVersion; /** Server node holder. */ private IDisposable _server; @@ -49,10 +59,11 @@ namespace Apache.Ignite.Core.Tests.Client /// <summary> /// Initializes a new instance of <see cref="ClientServerCompatibilityTest"/>. /// </summary> - public ClientServerCompatibilityTest(string igniteVersion, string clientProtocolVersion) + public ClientServerCompatibilityTest(string groupId, string serverVersion, int clientProtocolVersion) { - _igniteVersion = igniteVersion; - _clientProtocolVersion = clientProtocolVersion; + _groupId = groupId; + _serverVersion = serverVersion; + _clientProtocolVersion = new ClientProtocolVersion(1, (short) clientProtocolVersion, 0); } /// <summary> @@ -61,7 +72,7 @@ namespace Apache.Ignite.Core.Tests.Client [TestFixtureSetUp] public void FixtureSetUp() { - _server = JavaServer.Start(_igniteVersion); + _server = JavaServer.Start(_groupId, _serverVersion); } /// <summary> @@ -96,7 +107,7 @@ namespace Apache.Ignite.Core.Tests.Client using (var client = StartClient()) { ClientProtocolCompatibilityTest.TestClusterOperationsThrowCorrectExceptionOnVersionsOlderThan150( - client, _clientProtocolVersion); + client, _clientProtocolVersion.ToString()); } } @@ -108,12 +119,70 @@ namespace Apache.Ignite.Core.Tests.Client { using (var client = StartClient()) { - Assert.IsFalse(client.GetConfiguration().EnableAffinityAwareness); + var expectedPartitionAwareness = _clientProtocolVersion >= ClientSocket.Ver140; + Assert.AreEqual(expectedPartitionAwareness, client.GetConfiguration().EnablePartitionAwareness); + var cache = client.GetOrCreateCache<int, int>(TestContext.CurrentContext.Test.Name); cache.Put(1, 2); Assert.AreEqual(2, cache.Get(1)); } } + + /// <summary> + /// Tests that WithExpiryPolicy throws proper exception on older server versions. + /// </summary> + [Test] + public void TestWithExpiryPolicyThrowCorrectExceptionOnVersionsOlderThan150() + { + if (_clientProtocolVersion >= ClientSocket.Ver150) + { + return; + } + + using (var client = StartClient()) + { + var cache = client.GetOrCreateCache<int, int>(TestContext.CurrentContext.Test.Name); + var cacheWithExpiry = cache.WithExpiryPolicy(new ExpiryPolicy(TimeSpan.FromSeconds(1), null, null)); + + ClientProtocolCompatibilityTest.AssertNotSupportedOperation( + () => cacheWithExpiry.Put(1, 2), _clientProtocolVersion.ToString(), "WithExpiryPolicy"); + } + } + + /// <summary> + /// Tests that server-side configured expiry policy works on all client versions. + /// </summary> + [Test] + public void TestServerSideExpiryPolicyWorksOnAllVersions() + { + using (var client = StartClient()) + { + var cache = client.GetCache<int, int>("twoSecondCache"); + + cache.Put(1, 2); + Assert.True(cache.ContainsKey(1)); + + Thread.Sleep(TimeSpan.FromSeconds(2.1)); + Assert.False(cache.ContainsKey(1)); + } + } + + /// <summary> + /// Tests that CreateCache with all config properties customized works on all versions. + /// </summary> + [Test] + public void TestCreateCacheWithFullConfigWorksOnAllVersions() + { + using (var client = StartClient()) + { + var cache = client.CreateCache<int, Person>(GetFullCacheConfiguration()); + + cache.Put(1, new Person(2)); + + Assert.AreEqual(2, cache.Get(1).Id); + Assert.AreEqual("Person 2", cache[1].Name); + } + } /// <summary> /// Starts the client. @@ -128,5 +197,72 @@ namespace Apache.Ignite.Core.Tests.Client return Ignition.StartClient(cfg); } + + /// <summary> + /// Gets the cache config. + /// </summary> + private static CacheClientConfiguration GetFullCacheConfiguration() + { + return new CacheClientConfiguration + { + Name = Guid.NewGuid().ToString(), + Backups = 3, + AtomicityMode = CacheAtomicityMode.Transactional, + CacheMode = CacheMode.Partitioned, + EagerTtl = false, + EnableStatistics = true, + GroupName = Guid.NewGuid().ToString(), + KeyConfiguration = new[] + { + new CacheKeyConfiguration + { + TypeName = typeof(Person).FullName, + AffinityKeyFieldName = "Name" + } + }, + LockTimeout =TimeSpan.FromSeconds(5), + QueryEntities = new[] + { + new QueryEntity(typeof(int), typeof(Person)) + { + Aliases = new[] + { + new QueryAlias("Person.Name", "PName") + } + } + }, + QueryParallelism = 7, + RebalanceDelay = TimeSpan.FromSeconds(1.5), + RebalanceMode = CacheRebalanceMode.Sync, + RebalanceOrder = 25, + RebalanceThrottle = TimeSpan.FromSeconds(2.3), + RebalanceTimeout = TimeSpan.FromSeconds(42), + SqlSchema = Guid.NewGuid().ToString(), + CopyOnRead = false, + DataRegionName = DataStorageConfiguration.DefaultDataRegionName, + ExpiryPolicyFactory = new TestExpiryPolicyFactory(), + OnheapCacheEnabled = true, + PartitionLossPolicy = PartitionLossPolicy.ReadWriteAll, + ReadFromBackup = false, + RebalanceBatchSize = 100000, + SqlEscapeAll = true, + WriteSynchronizationMode = CacheWriteSynchronizationMode.FullAsync, + MaxConcurrentAsyncOperations = 123, + MaxQueryIteratorsCount = 17, + QueryDetailMetricsSize = 50, + RebalanceBatchesPrefetchCount = 4, + SqlIndexMaxInlineSize = 200000 + }; + } + + /** */ + private class TestExpiryPolicyFactory : IFactory<IExpiryPolicy> + { + /** */ + public IExpiryPolicy CreateInstance() + { + return new ExpiryPolicy(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(3)); + } + } } } \ No newline at end of file diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/JavaServer.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/JavaServer.cs index 9249532..6c44ebcb 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/JavaServer.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/JavaServer.cs @@ -37,6 +37,9 @@ namespace Apache.Ignite.Core.Tests { /** Client port. */ public const int ClientPort = 10890; + + /** Apache Ignite artifact group ID. */ + public const string GroupIdIgnite = "org.apache.ignite"; /** Maven command to execute the main class. */ private const string MavenCommandExec = "compile exec:java -D\"exec.mainClass\"=\"Runner\""; @@ -53,20 +56,22 @@ namespace Apache.Ignite.Core.Tests /// <summary> /// Starts a server node with a given version. /// </summary> + /// <param name="groupId">Maven artifact group id.</param> /// <param name="version">Product version.</param> /// <returns>Disposable object to stop the server.</returns> - public static IDisposable Start(string version) + public static IDisposable Start(string groupId, string version) { IgniteArgumentCheck.NotNullOrEmpty(version, "version"); - ReplaceIgniteVersionInPomFile(version, Path.Combine(JavaServerSourcePath, "pom.xml")); + var pomWrapper = + ReplaceIgniteVersionInPomFile(groupId, version, Path.Combine(JavaServerSourcePath, "pom.xml")); var process = new System.Diagnostics.Process { StartInfo = new ProcessStartInfo { FileName = Os.IsWindows ? "cmd.exe" : "/bin/bash", - Arguments = Os.IsWindows + Arguments = Os.IsWindows ? string.Format("/c \"{0} {1}\"", MavenPath, MavenCommandExec) : string.Format("-c \"{0} {1}\"", MavenPath, MavenCommandExec.Replace("\"", "\\\"")), UseShellExecute = false, @@ -79,35 +84,51 @@ namespace Apache.Ignite.Core.Tests process.Start(); - var listDataReader = new ListDataReader(); - process.AttachProcessConsoleReader(listDataReader, new IgniteProcessConsoleOutputReader()); - - var processWrapper = new DisposeAction(() => process.KillProcessTree()); + var processWrapper = new DisposeAction(() => + { + process.KillProcessTree(); + pomWrapper.Dispose(); + }); - // Wait for node to come up with a thin client connection. - if (WaitForStart()) + try { - return processWrapper; - } + var listDataReader = new ListDataReader(); + process.AttachProcessConsoleReader(listDataReader, new IgniteProcessConsoleOutputReader()); + + // Wait for node to come up with a thin client connection. + if (WaitForStart()) + { + return processWrapper; + } - if (!process.HasExited) + throw new Exception("Failed to start Java node: " + string.Join(",", listDataReader.GetOutput())); + } + catch (Exception) { processWrapper.Dispose(); + throw; } - - throw new Exception("Failed to start Java node: " + string.Join(",", listDataReader.GetOutput())); } /// <summary> /// Updates pom.xml with given Ignite version. /// </summary> - private static void ReplaceIgniteVersionInPomFile(string version, string pomFile) + private static IDisposable ReplaceIgniteVersionInPomFile(string groupId, string version, string pomFile) { var pomContent = File.ReadAllText(pomFile); + var originalPomContent = pomContent; + pomContent = Regex.Replace(pomContent, @"<version>\d+\.\d+\.\d+</version>", string.Format("<version>{0}</version>", version)); + + pomContent = Regex.Replace(pomContent, + @"<groupId>org.*?</groupId>", + string.Format("<groupId>{0}</groupId>", groupId)); + File.WriteAllText(pomFile, pomContent); + + return new DisposeAction(() => File.WriteAllText(pomFile, originalPomContent)); } /// <summary> @@ -139,7 +160,7 @@ namespace Apache.Ignite.Core.Tests { return false; } - }, 60000); + }, 180000); } /// <summary> diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/JavaServer/pom.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/JavaServer/pom.xml index e2c6022..95b5bf9 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/JavaServer/pom.xml +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/JavaServer/pom.xml @@ -22,7 +22,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> - <groupId>org.example</groupId> + <groupId>foo-bar</groupId> <artifactId>ignite-maven-server</artifactId> <version>1.0-SNAPSHOT</version> diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/JavaServer/src/main/java/Runner.java b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/JavaServer/src/main/java/Runner.java index 2fe8467..6a9e5e2 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/JavaServer/src/main/java/Runner.java +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/JavaServer/src/main/java/Runner.java @@ -16,25 +16,39 @@ */ import org.apache.ignite.Ignition; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.ClientConnectorConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.expiry.CreatedExpiryPolicy; +import javax.cache.expiry.Duration; import java.util.Collections; +import java.util.concurrent.TimeUnit; public class Runner { public static void main(String[] args) { ClientConnectorConfiguration connectorConfiguration = new ClientConnectorConfiguration().setPort(10890); TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder() - .setAddresses(Collections.singleton("127.0.0.1:47500..47501")); + .setAddresses(Collections.singleton("127.0.0.1:47500")); - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi().setIpFinder(ipFinder).setSocketTimeout(300); + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi() + .setIpFinder(ipFinder) + .setSocketTimeout(300) + .setNetworkTimeout(300); + + CacheConfiguration expiryCacheCfg = new CacheConfiguration("twoSecondCache") + .setExpiryPolicyFactory(FactoryBuilder.factoryOf( + new CreatedExpiryPolicy(new Duration(TimeUnit.SECONDS, 2)))); IgniteConfiguration cfg = new IgniteConfiguration() .setClientConnectorConfiguration(connectorConfiguration) - .setDiscoverySpi(discoSpi); + .setDiscoverySpi(discoSpi) + .setCacheConfiguration(expiryCacheCfg) + .setLocalHost("127.0.0.1"); Ignition.start(cfg); } diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProcessExtensions.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProcessExtensions.cs index a3255c4..4785694 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProcessExtensions.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProcessExtensions.cs @@ -122,6 +122,7 @@ namespace Apache.Ignite.Core.Tests if (Os.IsWindows) { Execute("taskkill", string.Format("/T /F /PID {0}", process.Id)); + process.WaitForExit(); } else { diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj index 862ced0..affc575 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj @@ -83,7 +83,11 @@ <Compile Include="Failure\StopNodeOrHaltFailureHandler.cs" /> <Compile Include="Impl\Binary\BinaryHashCodeUtils.cs" /> <Compile Include="Impl\Cache\QueryMetricsImpl.cs" /> + <Compile Include="Impl\Client\ClientContextBase.cs" /> <Compile Include="Impl\Client\ClientOpExtensions.cs" /> + <Compile Include="Impl\Client\ClientRequestContext.cs" /> + <Compile Include="Impl\Client\ClientResponseContext.cs" /> + <Compile Include="Impl\Client\ClientUtils.cs" /> <Compile Include="Impl\Client\Cluster\ClientCluster.cs" /> <Compile Include="Impl\Client\Cache\ClientCacheAffinityAwarenessGroup.cs" /> <Compile Include="Impl\Client\Cache\ClientCachePartitionMap.cs" /> @@ -124,7 +128,6 @@ <Compile Include="Client\ClientStatusCode.cs" /> <Compile Include="Events\LocalEventListener.cs" /> <Compile Include="Impl\Client\ClientFailoverSocket.cs" /> - <Compile Include="Impl\Client\IClientSocket.cs" /> <Compile Include="Impl\Cluster\BaselineNode.cs" /> <Compile Include="Ssl\SslContextFactory.cs" /> <Compile Include="Impl\Ssl\SslFactorySerializer.cs" /> diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs index c3fc405..84bd595 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs @@ -139,10 +139,10 @@ namespace Apache.Ignite.Core.Client /// Examples of supported formats: /// * 192.168.1.25 (default port is used, see <see cref="DefaultPort"/>). /// * 192.168.1.25:780 (custom port) - /// * 192.168.1.25:780-787 (custom port range) + /// * 192.168.1.25:780..787 (custom port range) /// * my-host.com (default port is used, see <see cref="DefaultPort"/>). /// * my-host.com:780 (custom port) - /// * my-host.com:780-787 (custom port range) + /// * my-host.com:780..787 (custom port range) /// <para /> /// When multiple endpoints are specified, failover and load-balancing mechanism is enabled: /// * Ignite picks random endpoint and connects to it. diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessorClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessorClient.cs index d68f66e..9e0ce75 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessorClient.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessorClient.cs @@ -32,16 +32,13 @@ namespace Apache.Ignite.Core.Impl.Binary private const byte DotNetPlatformId = 1; /** Socket. */ - private readonly IClientSocket _socket; - - /** Marshaller. */ - private readonly Marshaller _marsh = BinaryUtils.Marshaller; + private readonly ClientFailoverSocket _socket; /// <summary> /// Initializes a new instance of the <see cref="BinaryProcessorClient"/> class. /// </summary> /// <param name="socket">The socket.</param> - public BinaryProcessorClient(IClientSocket socket) + public BinaryProcessorClient(ClientFailoverSocket socket) { Debug.Assert(socket != null); @@ -51,8 +48,8 @@ namespace Apache.Ignite.Core.Impl.Binary /** <inheritdoc /> */ public BinaryType GetBinaryType(int typeId) { - return _socket.DoOutInOp(ClientOp.BinaryTypeGet, s => s.WriteInt(typeId), - s => s.ReadBool() ? new BinaryType(_marsh.StartUnmarshal(s), true) : null); + return _socket.DoOutInOp(ClientOp.BinaryTypeGet, ctx => ctx.Stream.WriteInt(typeId), + ctx => ctx.Stream.ReadBool() ? new BinaryType(ctx.Reader, true) : null); } /** <inheritdoc /> */ @@ -77,19 +74,19 @@ namespace Apache.Ignite.Core.Impl.Binary var type = binaryType; // Access to modified closure. _socket.DoOutInOp<object>(ClientOp.BinaryTypePut, - s => BinaryProcessor.WriteBinaryType(_marsh.StartMarshal(s), type), null); + ctx => BinaryProcessor.WriteBinaryType(ctx.Writer, type), null); } } /** <inheritdoc /> */ public bool RegisterType(int id, string typeName) { - return _socket.DoOutInOp(ClientOp.BinaryTypeNamePut, s => + return _socket.DoOutInOp(ClientOp.BinaryTypeNamePut, ctx => { - s.WriteByte(DotNetPlatformId); - s.WriteInt(id); - _marsh.StartMarshal(s).WriteString(typeName); - }, s => s.ReadBool()); + ctx.Stream.WriteByte(DotNetPlatformId); + ctx.Stream.WriteInt(id); + ctx.Writer.WriteString(typeName); + }, ctx => ctx.Stream.ReadBool()); } /** <inheritdoc /> */ @@ -101,12 +98,12 @@ namespace Apache.Ignite.Core.Impl.Binary /** <inheritdoc /> */ public string GetTypeName(int id) { - return _socket.DoOutInOp(ClientOp.BinaryTypeNameGet, s => + return _socket.DoOutInOp(ClientOp.BinaryTypeNameGet, ctx => { - s.WriteByte(DotNetPlatformId); - s.WriteInt(id); + ctx.Stream.WriteByte(DotNetPlatformId); + ctx.Stream.WriteInt(id); }, - s => _marsh.StartUnmarshal(s).ReadString()); + ctx => ctx.Reader.ReadString()); } } } \ No newline at end of file diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs index 9f186bb..ec8a7fb 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs @@ -134,7 +134,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache { IgniteArgumentCheck.NotNull(key, "key"); - return DoOutInOpAffinity(ClientOp.CacheGet, key, UnmarshalNotNull<TV>); + return DoOutInOpAffinity(ClientOp.CacheGet, key, ctx => UnmarshalNotNull<TV>(ctx)); } /** <inheritDoc /> */ @@ -142,7 +142,8 @@ namespace Apache.Ignite.Core.Impl.Client.Cache { IgniteArgumentCheck.NotNull(key, "key"); - return DoOutInOpAffinityAsync(ClientOp.CacheGet, key, w => w.WriteObjectDetached(key), UnmarshalNotNull<TV>); + return DoOutInOpAffinityAsync(ClientOp.CacheGet, key, ctx => ctx.Writer.WriteObjectDetached(key), + ctx => UnmarshalNotNull<TV>(ctx)); } /** <inheritDoc /> */ @@ -162,7 +163,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache { IgniteArgumentCheck.NotNull(key, "key"); - return DoOutInOpAffinityAsync(ClientOp.CacheGet, key, w => w.WriteObjectDetached(key), + return DoOutInOpAffinityAsync(ClientOp.CacheGet, key, ctx => ctx.Writer.WriteObjectDetached(key), UnmarshalCacheResult<TV>); } @@ -171,7 +172,8 @@ namespace Apache.Ignite.Core.Impl.Client.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - return DoOutInOp(ClientOp.CacheGetAll, w => w.WriteEnumerable(keys), s => ReadCacheEntries(s)); + return DoOutInOp(ClientOp.CacheGetAll, ctx => ctx.Writer.WriteEnumerable(keys), + s => ReadCacheEntries(s.Stream)); } /** <inheritDoc /> */ @@ -179,7 +181,8 @@ namespace Apache.Ignite.Core.Impl.Client.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - return DoOutInOpAsync(ClientOp.CacheGetAll, w => w.WriteEnumerable(keys), s => ReadCacheEntries(s)); + return DoOutInOpAsync(ClientOp.CacheGetAll, ctx => ctx.Writer.WriteEnumerable(keys), + s => ReadCacheEntries(s.Stream)); } /** <inheritDoc /> */ @@ -188,7 +191,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache IgniteArgumentCheck.NotNull(key, "key"); IgniteArgumentCheck.NotNull(val, "val"); - DoOutOpAffinity(ClientOp.CachePut, key, val); + DoOutInOpAffinity<object>(ClientOp.CachePut, key, val, null); } /** <inheritDoc /> */ @@ -197,7 +200,10 @@ namespace Apache.Ignite.Core.Impl.Client.Cache IgniteArgumentCheck.NotNull(key, "key"); IgniteArgumentCheck.NotNull(val, "val"); - return DoOutOpAffinityAsync(ClientOp.CachePut, key, w => WriteKeyVal(w, key, val)); + return DoOutOpAffinityAsync(ClientOp.CachePut, key, ctx => { + ctx.Writer.WriteObjectDetached(key); + ctx.Writer.WriteObjectDetached(val); + }); } /** <inheritDoc /> */ @@ -205,7 +211,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache { IgniteArgumentCheck.NotNull(key, "key"); - return DoOutInOpAffinity(ClientOp.CacheContainsKey, key, r => r.ReadBool()); + return DoOutInOpAffinity(ClientOp.CacheContainsKey, key, ctx => ctx.Stream.ReadBool()); } /** <inheritDoc /> */ @@ -213,7 +219,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache { IgniteArgumentCheck.NotNull(key, "key"); - return DoOutInOpAffinityAsync(ClientOp.CacheContainsKey, key, r => r.ReadBool()); + return DoOutInOpAffinityAsync(ClientOp.CacheContainsKey, key, ctx => ctx.Stream.ReadBool()); } /** <inheritDoc /> */ @@ -221,7 +227,8 @@ namespace Apache.Ignite.Core.Impl.Client.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - return DoOutInOp(ClientOp.CacheContainsKeys, w => w.WriteEnumerable(keys), r => r.ReadBool()); + return DoOutInOp(ClientOp.CacheContainsKeys, ctx => ctx.Writer.WriteEnumerable(keys), + ctx => ctx.Stream.ReadBool()); } /** <inheritDoc /> */ @@ -229,7 +236,8 @@ namespace Apache.Ignite.Core.Impl.Client.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - return DoOutInOpAsync(ClientOp.CacheContainsKeys, w => w.WriteEnumerable(keys), r => r.ReadBool()); + return DoOutInOpAsync(ClientOp.CacheContainsKeys, ctx => ctx.Writer.WriteEnumerable(keys), + ctx => ctx.Stream.ReadBool()); } /** <inheritDoc /> */ @@ -239,9 +247,9 @@ namespace Apache.Ignite.Core.Impl.Client.Cache // Filter is a binary object for all platforms. // For .NET it is a CacheEntryFilterHolder with a predefined id (BinaryTypeId.CacheEntryPredicateHolder). - return DoOutInOp(ClientOp.QueryScan, w => WriteScanQuery(w, scanQuery), - s => new ClientQueryCursor<TK, TV>( - _ignite, s.ReadLong(), _keepBinary, s, ClientOp.QueryScanCursorGetPage)); + return DoOutInOp(ClientOp.QueryScan, w => WriteScanQuery(w.Writer, scanQuery), + ctx => new ClientQueryCursor<TK, TV>( + _ignite, ctx.Stream.ReadLong(), _keepBinary, ctx.Stream, ClientOp.QueryScanCursorGetPage)); } /** <inheritDoc /> */ @@ -252,9 +260,9 @@ namespace Apache.Ignite.Core.Impl.Client.Cache IgniteArgumentCheck.NotNull(sqlQuery.Sql, "sqlQuery.Sql"); IgniteArgumentCheck.NotNull(sqlQuery.QueryType, "sqlQuery.QueryType"); - return DoOutInOp(ClientOp.QuerySql, w => WriteSqlQuery(w, sqlQuery), - s => new ClientQueryCursor<TK, TV>( - _ignite, s.ReadLong(), _keepBinary, s, ClientOp.QuerySqlCursorGetPage)); + return DoOutInOp(ClientOp.QuerySql, w => WriteSqlQuery(w.Writer, sqlQuery), + ctx => new ClientQueryCursor<TK, TV>( + _ignite, ctx.Stream.ReadLong(), _keepBinary, ctx.Stream, ClientOp.QuerySqlCursorGetPage)); } /** <inheritDoc /> */ @@ -264,16 +272,16 @@ namespace Apache.Ignite.Core.Impl.Client.Cache IgniteArgumentCheck.NotNull(sqlFieldsQuery.Sql, "sqlFieldsQuery.Sql"); return DoOutInOp(ClientOp.QuerySqlFields, - w => WriteSqlFieldsQuery(w, sqlFieldsQuery), - s => GetFieldsCursor(s)); + ctx => WriteSqlFieldsQuery(ctx.Writer, sqlFieldsQuery), + ctx => GetFieldsCursor(ctx)); } /** <inheritDoc /> */ public IQueryCursor<T> Query<T>(SqlFieldsQuery sqlFieldsQuery, Func<IBinaryRawReader, int, T> readerFunc) { return DoOutInOp(ClientOp.QuerySqlFields, - w => WriteSqlFieldsQuery(w, sqlFieldsQuery, false), - s => GetFieldsCursorNoColumnNames(s, readerFunc)); + ctx => WriteSqlFieldsQuery(ctx.Writer, sqlFieldsQuery, false), + ctx => GetFieldsCursorNoColumnNames(ctx.Stream, readerFunc)); } /** <inheritDoc /> */ @@ -334,7 +342,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache IgniteArgumentCheck.NotNull(key, "key"); IgniteArgumentCheck.NotNull(val, "val"); - return DoOutInOpAffinity(ClientOp.CachePutIfAbsent, key, val, s => s.ReadBool()); + return DoOutInOpAffinity(ClientOp.CachePutIfAbsent, key, val, ctx => ctx.Stream.ReadBool()); } /** <inheritDoc /> */ @@ -343,7 +351,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache IgniteArgumentCheck.NotNull(key, "key"); IgniteArgumentCheck.NotNull(val, "val"); - return DoOutInOpAffinityAsync(ClientOp.CachePutIfAbsent, key, val, s => s.ReadBool()); + return DoOutInOpAffinityAsync(ClientOp.CachePutIfAbsent, key, val, ctx => ctx.Stream.ReadBool()); } /** <inheritDoc /> */ @@ -370,7 +378,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache IgniteArgumentCheck.NotNull(key, "key"); IgniteArgumentCheck.NotNull(val, "val"); - return DoOutInOpAffinity(ClientOp.CacheReplace, key, val, s => s.ReadBool()); + return DoOutInOpAffinity(ClientOp.CacheReplace, key, val, ctx => ctx.Stream.ReadBool()); } /** <inheritDoc /> */ @@ -379,7 +387,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache IgniteArgumentCheck.NotNull(key, "key"); IgniteArgumentCheck.NotNull(val, "val"); - return DoOutInOpAffinityAsync(ClientOp.CacheReplace, key, val, s => s.ReadBool()); + return DoOutInOpAffinityAsync(ClientOp.CacheReplace, key, val, ctx => ctx.Stream.ReadBool()); } /** <inheritDoc /> */ @@ -389,12 +397,12 @@ namespace Apache.Ignite.Core.Impl.Client.Cache IgniteArgumentCheck.NotNull(oldVal, "oldVal"); IgniteArgumentCheck.NotNull(newVal, "newVal"); - return DoOutInOpAffinity(ClientOp.CacheReplaceIfEquals, key, w => + return DoOutInOpAffinity(ClientOp.CacheReplaceIfEquals, key, ctx => { - w.WriteObjectDetached(key); - w.WriteObjectDetached(oldVal); - w.WriteObjectDetached(newVal); - }, s => s.ReadBool()); + ctx.Writer.WriteObjectDetached(key); + ctx.Writer.WriteObjectDetached(oldVal); + ctx.Writer.WriteObjectDetached(newVal); + }, ctx => ctx.Stream.ReadBool()); } /** <inheritDoc /> */ @@ -404,12 +412,12 @@ namespace Apache.Ignite.Core.Impl.Client.Cache IgniteArgumentCheck.NotNull(oldVal, "oldVal"); IgniteArgumentCheck.NotNull(newVal, "newVal"); - return DoOutInOpAffinityAsync(ClientOp.CacheReplaceIfEquals, key, w => + return DoOutInOpAffinityAsync(ClientOp.CacheReplaceIfEquals, key, ctx => { - w.WriteObjectDetached(key); - w.WriteObjectDetached(oldVal); - w.WriteObjectDetached(newVal); - }, s => s.ReadBool()); + ctx.Writer.WriteObjectDetached(key); + ctx.Writer.WriteObjectDetached(oldVal); + ctx.Writer.WriteObjectDetached(newVal); + }, ctx => ctx.Stream.ReadBool()); } /** <inheritDoc /> */ @@ -417,7 +425,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache { IgniteArgumentCheck.NotNull(vals, "vals"); - DoOutOp(ClientOp.CachePutAll, w => w.WriteDictionary(vals)); + DoOutOp(ClientOp.CachePutAll, ctx => ctx.Writer.WriteDictionary(vals)); } /** <inheritDoc /> */ @@ -425,7 +433,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache { IgniteArgumentCheck.NotNull(vals, "vals"); - return DoOutOpAsync(ClientOp.CachePutAll, w => w.WriteDictionary(vals)); + return DoOutOpAsync(ClientOp.CachePutAll, ctx => ctx.Writer.WriteDictionary(vals)); } /** <inheritDoc /> */ @@ -453,7 +461,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache { IgniteArgumentCheck.NotNull(key, "key"); - return DoOutOpAffinityAsync(ClientOp.CacheClearKey, key, w => w.WriteObjectDetached(key)); + return DoOutOpAffinityAsync(ClientOp.CacheClearKey, key, ctx => ctx.Writer.WriteObjectDetached(key)); } /** <inheritDoc /> */ @@ -461,7 +469,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - DoOutOp(ClientOp.CacheClearKeys, w => w.WriteEnumerable(keys)); + DoOutOp(ClientOp.CacheClearKeys, ctx => ctx.Writer.WriteEnumerable(keys)); } /** <inheritDoc /> */ @@ -469,7 +477,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - return DoOutOpAsync(ClientOp.CacheClearKeys, w => w.WriteEnumerable(keys)); + return DoOutOpAsync(ClientOp.CacheClearKeys, ctx => ctx.Writer.WriteEnumerable(keys)); } /** <inheritDoc /> */ @@ -477,7 +485,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache { IgniteArgumentCheck.NotNull(key, "key"); - return DoOutInOpAffinity(ClientOp.CacheRemoveKey, key, r => r.ReadBool()); + return DoOutInOpAffinity(ClientOp.CacheRemoveKey, key, ctx => ctx.Stream.ReadBool()); } /** <inheritDoc /> */ @@ -485,7 +493,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache { IgniteArgumentCheck.NotNull(key, "key"); - return DoOutInOpAffinityAsync(ClientOp.CacheRemoveKey, key, r => r.ReadBool()); + return DoOutInOpAffinityAsync(ClientOp.CacheRemoveKey, key, ctx => ctx.Stream.ReadBool()); } /** <inheritDoc /> */ @@ -494,7 +502,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache IgniteArgumentCheck.NotNull(key, "key"); IgniteArgumentCheck.NotNull(val, "val"); - return DoOutInOpAffinity(ClientOp.CacheRemoveIfEquals, key, val, r => r.ReadBool()); + return DoOutInOpAffinity(ClientOp.CacheRemoveIfEquals, key, val, ctx => ctx.Stream.ReadBool()); } /** <inheritDoc /> */ @@ -503,7 +511,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache IgniteArgumentCheck.NotNull(key, "key"); IgniteArgumentCheck.NotNull(val, "val"); - return DoOutInOpAffinityAsync(ClientOp.CacheRemoveIfEquals, key, val, r => r.ReadBool()); + return DoOutInOpAffinityAsync(ClientOp.CacheRemoveIfEquals, key, val, ctx => ctx.Stream.ReadBool()); } /** <inheritDoc /> */ @@ -511,7 +519,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - DoOutOp(ClientOp.CacheRemoveKeys, w => w.WriteEnumerable(keys)); + DoOutOp(ClientOp.CacheRemoveKeys, ctx => ctx.Writer.WriteEnumerable(keys)); } /** <inheritDoc /> */ @@ -519,7 +527,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - return DoOutOpAsync(ClientOp.CacheRemoveKeys, w => w.WriteEnumerable(keys)); + return DoOutOpAsync(ClientOp.CacheRemoveKeys, ctx => ctx.Writer.WriteEnumerable(keys)); } /** <inheritDoc /> */ @@ -537,20 +545,22 @@ namespace Apache.Ignite.Core.Impl.Client.Cache /** <inheritDoc /> */ public long GetSize(params CachePeekMode[] modes) { - return DoOutInOp(ClientOp.CacheGetSize, w => WritePeekModes(modes, w), s => s.ReadLong()); + return DoOutInOp(ClientOp.CacheGetSize, w => WritePeekModes(modes, w.Stream), + ctx => ctx.Stream.ReadLong()); } /** <inheritDoc /> */ public Task<long> GetSizeAsync(params CachePeekMode[] modes) { - return DoOutInOpAsync(ClientOp.CacheGetSize, w => WritePeekModes(modes, w), s => s.ReadLong()); + return DoOutInOpAsync(ClientOp.CacheGetSize, w => WritePeekModes(modes, w.Stream), + ctx => ctx.Stream.ReadLong()); } /** <inheritDoc /> */ public CacheClientConfiguration GetConfiguration() { return DoOutInOp(ClientOp.CacheGetConfiguration, null, - s => new CacheClientConfiguration(s, _ignite.ServerVersion)); + ctx => new CacheClientConfiguration(ctx.Stream, ctx.ProtocolVersion)); } /** <inheritDoc /> */ @@ -584,6 +594,10 @@ namespace Apache.Ignite.Core.Impl.Client.Cache { IgniteArgumentCheck.NotNull(plc, "plc"); + // WithExpiryPolicy is not supported on protocols older than 1.5.0. + // However, we can't check that here because of partition awareness, reconnect and so on: + // We don't know which connection is going to be used. This connection may not even exist yet. + // See WriteRequest. return new CacheClient<TK, TV>(_ignite, _name, _keepBinary, plc); } @@ -599,7 +613,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache /// <summary> /// Does the out op. /// </summary> - private void DoOutOp(ClientOp opId, Action<BinaryWriter> writeAction = null) + private void DoOutOp(ClientOp opId, Action<ClientRequestContext> writeAction = null) { DoOutInOp<object>(opId, writeAction, null); } @@ -615,15 +629,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache /// <summary> /// Does the out op with affinity awareness. /// </summary> - private void DoOutOpAffinity(ClientOp opId, TK key, TV val) - { - DoOutInOpAffinity<object>(opId, key, val, null); - } - - /// <summary> - /// Does the out op with affinity awareness. - /// </summary> - private Task DoOutOpAsync(ClientOp opId, Action<BinaryWriter> writeAction = null) + private Task DoOutOpAsync(ClientOp opId, Action<ClientRequestContext> writeAction = null) { return DoOutInOpAsync<object>(opId, writeAction, null); } @@ -631,7 +637,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache /// <summary> /// Does the out op with affinity awareness. /// </summary> - private Task DoOutOpAffinityAsync(ClientOp opId, TK key, Action<BinaryWriter> writeAction = null) + private Task DoOutOpAffinityAsync(ClientOp opId, TK key, Action<ClientRequestContext> writeAction = null) { return DoOutInOpAffinityAsync<object>(opId, key, writeAction, null); } @@ -639,21 +645,21 @@ namespace Apache.Ignite.Core.Impl.Client.Cache /// <summary> /// Does the out in op. /// </summary> - private T DoOutInOp<T>(ClientOp opId, Action<BinaryWriter> writeAction, - Func<IBinaryStream, T> readFunc) + private T DoOutInOp<T>(ClientOp opId, Action<ClientRequestContext> writeAction, + Func<ClientResponseContext, T> readFunc) { - return _ignite.Socket.DoOutInOp(opId, stream => WriteRequest(writeAction, stream), + return _ignite.Socket.DoOutInOp(opId, ctx => WriteRequest(writeAction, ctx), readFunc, HandleError<T>); } /// <summary> /// Does the out in op with affinity awareness. /// </summary> - private T DoOutInOpAffinity<T>(ClientOp opId, TK key, Func<IBinaryStream, T> readFunc) + private T DoOutInOpAffinity<T>(ClientOp opId, TK key, Func<ClientResponseContext, T> readFunc) { return _ignite.Socket.DoOutInOpAffinity( opId, - stream => WriteRequest(w => w.WriteObjectDetached(key), stream), + ctx => WriteRequest(c => c.Writer.WriteObjectDetached(key), ctx), readFunc, _id, key, @@ -663,12 +669,12 @@ namespace Apache.Ignite.Core.Impl.Client.Cache /// <summary> /// Does the out in op with affinity awareness. /// </summary> - private T DoOutInOpAffinity<T>(ClientOp opId, TK key, Action<BinaryWriter> writeAction, - Func<IBinaryStream, T> readFunc) + private T DoOutInOpAffinity<T>(ClientOp opId, TK key, Action<ClientRequestContext> writeAction, + Func<ClientResponseContext, T> readFunc) { return _ignite.Socket.DoOutInOpAffinity( opId, - stream => WriteRequest(writeAction, stream), + ctx => WriteRequest(writeAction, ctx), readFunc, _id, key, @@ -678,15 +684,15 @@ namespace Apache.Ignite.Core.Impl.Client.Cache /// <summary> /// Does the out in op with affinity awareness. /// </summary> - private T DoOutInOpAffinity<T>(ClientOp opId, TK key, TV val, Func<IBinaryStream, T> readFunc) + private T DoOutInOpAffinity<T>(ClientOp opId, TK key, TV val, Func<ClientResponseContext, T> readFunc) { return _ignite.Socket.DoOutInOpAffinity( opId, - stream => WriteRequest(w => + ctx => WriteRequest(c => { - w.WriteObjectDetached(key); - w.WriteObjectDetached(val); - }, stream), + c.Writer.WriteObjectDetached(key); + c.Writer.WriteObjectDetached(val); + }, ctx), readFunc, _id, key, @@ -696,35 +702,35 @@ namespace Apache.Ignite.Core.Impl.Client.Cache /// <summary> /// Does the out in op. /// </summary> - private Task<T> DoOutInOpAsync<T>(ClientOp opId, Action<BinaryWriter> writeAction, - Func<IBinaryStream, T> readFunc) + private Task<T> DoOutInOpAsync<T>(ClientOp opId, Action<ClientRequestContext> writeAction, + Func<ClientResponseContext, T> readFunc) { - return _ignite.Socket.DoOutInOpAsync(opId, stream => WriteRequest(writeAction, stream), + return _ignite.Socket.DoOutInOpAsync(opId, ctx => WriteRequest(writeAction, ctx), readFunc, HandleError<T>); } /// <summary> /// Does the out in op with affinity awareness. /// </summary> - private Task<T> DoOutInOpAffinityAsync<T>(ClientOp opId, TK key, Action<BinaryWriter> writeAction, - Func<IBinaryStream, T> readFunc) + private Task<T> DoOutInOpAffinityAsync<T>(ClientOp opId, TK key, Action<ClientRequestContext> writeAction, + Func<ClientResponseContext, T> readFunc) { - return _ignite.Socket.DoOutInOpAffinityAsync(opId, stream => WriteRequest(writeAction, stream), + return _ignite.Socket.DoOutInOpAffinityAsync(opId, ctx => WriteRequest(writeAction, ctx), readFunc, _id, key, HandleError<T>); } /// <summary> /// Does the out in op with affinity awareness. /// </summary> - private Task<T> DoOutInOpAffinityAsync<T>(ClientOp opId, TK key, TV val, Func<IBinaryStream, T> readFunc) + private Task<T> DoOutInOpAffinityAsync<T>(ClientOp opId, TK key, TV val, Func<ClientResponseContext, T> readFunc) { return _ignite.Socket.DoOutInOpAffinityAsync( opId, - stream => WriteRequest(w => + ctx => WriteRequest(c => { - w.WriteObjectDetached(key); - w.WriteObjectDetached(val); - }, stream), + c.Writer.WriteObjectDetached(key); + c.Writer.WriteObjectDetached(val); + }, ctx), readFunc, _id, key, @@ -734,43 +740,45 @@ namespace Apache.Ignite.Core.Impl.Client.Cache /// <summary> /// Does the out in op with affinity awareness. /// </summary> - private Task<T> DoOutInOpAffinityAsync<T>(ClientOp opId, TK key, Func<IBinaryStream, T> readFunc) + private Task<T> DoOutInOpAffinityAsync<T>(ClientOp opId, TK key, Func<ClientResponseContext, T> readFunc) { return _ignite.Socket.DoOutInOpAffinityAsync(opId, - stream => WriteRequest(w => w.WriteObjectDetached(key), stream), + stream => WriteRequest(w => w.Writer.WriteObjectDetached(key), stream), readFunc, _id, key, HandleError<T>); } /// <summary> /// Writes the request. /// </summary> - private void WriteRequest(Action<BinaryWriter> writeAction, IBinaryStream stream) + private void WriteRequest(Action<ClientRequestContext> writeAction, ClientRequestContext ctx) { - stream.WriteInt(_id); + ctx.Stream.WriteInt(_id); - var writer = _marsh.StartMarshal(stream); if (_expiryPolicy != null) { - stream.WriteByte((byte) ClientCacheRequestFlag.WithExpiryPolicy); - ExpiryPolicySerializer.WritePolicy(writer, _expiryPolicy); + // Check whether WithExpiryPolicy is supported by the protocol here - + // ctx.ProtocolVersion refers to exact connection for this request. + ClientUtils.ValidateOp( + ClientCacheRequestFlag.WithExpiryPolicy, ctx.ProtocolVersion, ClientSocket.Ver150); + + ctx.Stream.WriteByte((byte) ClientCacheRequestFlag.WithExpiryPolicy); + ExpiryPolicySerializer.WritePolicy(ctx.Writer, _expiryPolicy); } else - stream.WriteByte((byte) ClientCacheRequestFlag.None); // Flags (skipStore, etc). + ctx.Stream.WriteByte((byte) ClientCacheRequestFlag.None); // Flags (skipStore, etc). if (writeAction != null) { - - writeAction(writer); - - _marsh.FinishMarshal(writer); + writeAction(ctx); } } /// <summary> /// Unmarshals the value, throwing an exception for nulls. /// </summary> - private T UnmarshalNotNull<T>(IBinaryStream stream) + private T UnmarshalNotNull<T>(ClientResponseContext ctx) { + var stream = ctx.Stream; var hdr = stream.ReadByte(); if (hdr == BinaryUtils.HdrNull) @@ -786,8 +794,9 @@ namespace Apache.Ignite.Core.Impl.Client.Cache /// <summary> /// Unmarshals the value, wrapping in a cache result. /// </summary> - private CacheResult<T> UnmarshalCacheResult<T>(IBinaryStream stream) + private CacheResult<T> UnmarshalCacheResult<T>(ClientResponseContext ctx) { + var stream = ctx.Stream; var hdr = stream.ReadByte(); if (hdr == BinaryUtils.HdrNull) @@ -882,25 +891,25 @@ namespace Apache.Ignite.Core.Impl.Client.Cache /// <summary> /// Gets the fields cursor. /// </summary> - private ClientFieldsQueryCursor GetFieldsCursor(IBinaryStream s) + private ClientFieldsQueryCursor GetFieldsCursor(ClientResponseContext ctx) { - var cursorId = s.ReadLong(); - var columnNames = ClientFieldsQueryCursor.ReadColumns(_marsh.StartUnmarshal(s)); + var cursorId = ctx.Stream.ReadLong(); + var columnNames = ClientFieldsQueryCursor.ReadColumns(ctx.Reader); - return new ClientFieldsQueryCursor(_ignite, cursorId, _keepBinary, s, + return new ClientFieldsQueryCursor(_ignite, cursorId, _keepBinary, ctx.Stream, ClientOp.QuerySqlFieldsCursorGetPage, columnNames); } /// <summary> /// Gets the fields cursor. /// </summary> - private ClientQueryCursorBase<T> GetFieldsCursorNoColumnNames<T>(IBinaryStream s, + private ClientQueryCursorBase<T> GetFieldsCursorNoColumnNames<T>(IBinaryStream stream, Func<IBinaryRawReader, int, T> readerFunc) { - var cursorId = s.ReadLong(); - var columnCount = s.ReadInt(); + var cursorId = stream.ReadLong(); + var columnCount = stream.ReadInt(); - return new ClientQueryCursorBase<T>(_ignite, cursorId, _keepBinary, s, + return new ClientQueryCursorBase<T>(_ignite, cursorId, _keepBinary, stream, ClientOp.QuerySqlFieldsCursorGetPage, r => readerFunc(r, columnCount)); } @@ -930,7 +939,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache /// <summary> /// Writes the peek modes. /// </summary> - private static void WritePeekModes(ICollection<CachePeekMode> modes, IBinaryRawWriter w) + private static void WritePeekModes(ICollection<CachePeekMode> modes, IBinaryStream w) { if (modes == null) { @@ -973,14 +982,5 @@ namespace Apache.Ignite.Core.Impl.Client.Cache return res; } - - /// <summary> - /// Writes key and value. - /// </summary> - private static void WriteKeyVal(BinaryWriter w, TK key, TV val) - { - w.WriteObjectDetached(key); - w.WriteObjectDetached(val); - } } } diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/Query/ClientQueryCursorBase.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/Query/ClientQueryCursorBase.cs index 5a0a1f6..5f70126 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/Query/ClientQueryCursorBase.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/Query/ClientQueryCursorBase.cs @@ -70,7 +70,8 @@ namespace Apache.Ignite.Core.Impl.Client.Cache.Query /** <inheritdoc /> */ protected override T[] GetBatch() { - return _ignite.Socket.DoOutInOp(_getPageOp, w => w.WriteLong(_cursorId), s => ConvertGetBatch(s)); + return _ignite.Socket.DoOutInOp(_getPageOp, ctx => ctx.Stream.WriteLong(_cursorId), + ctx => ConvertGetBatch(ctx.Stream)); } /** <inheritdoc /> */ @@ -78,7 +79,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache.Query { try { - _ignite.Socket.DoOutInOp<object>(ClientOp.ResourceClose, w => w.WriteLong(_cursorId), null); + _ignite.Socket.DoOutInOp<object>(ClientOp.ResourceClose, ctx => ctx.Writer.WriteLong(_cursorId), null); } finally { diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientContextBase.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientContextBase.cs new file mode 100644 index 0000000..9b0b8a0 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientContextBase.cs @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Client +{ + using System.Diagnostics; + using Apache.Ignite.Core.Impl.Binary; + using Apache.Ignite.Core.Impl.Binary.IO; + + /// <summary> + /// Base class for client context. + /// </summary> + internal abstract class ClientContextBase + { + /** */ + private readonly IBinaryStream _stream; + + /** */ + private readonly Marshaller _marshaller; + + /** */ + private readonly ClientProtocolVersion _protocolVersion; + + /// <summary> + /// Initializes a new instance of <see cref="ClientContextBase"/> class. + /// </summary> + /// <param name="stream">Stream.</param> + /// <param name="marshaller">Marshaller.</param> + /// <param name="protocolVersion">Protocol version to be used for this request.</param> + protected ClientContextBase(IBinaryStream stream, Marshaller marshaller, ClientProtocolVersion protocolVersion) + { + Debug.Assert(stream != null); + Debug.Assert(marshaller != null); + + _stream = stream; + _marshaller = marshaller; + _protocolVersion = protocolVersion; + } + + /// <summary> + /// Stream. + /// </summary> + public IBinaryStream Stream + { + get { return _stream; } + } + + /// <summary> + /// Gets the marshaller. + /// </summary> + public Marshaller Marshaller + { + get { return _marshaller; } + } + + /// <summary> + /// Protocol version to be used for this request. + /// (Takes partition awareness, failover and reconnect into account). + /// </summary> + public ClientProtocolVersion ProtocolVersion + { + get { return _protocolVersion; } + } + } +} \ No newline at end of file diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientFailoverSocket.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientFailoverSocket.cs index 578a14e..cb6f868 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientFailoverSocket.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientFailoverSocket.cs @@ -37,7 +37,7 @@ namespace Apache.Ignite.Core.Impl.Client /// <summary> /// Socket wrapper with reconnect/failover functionality: reconnects on failure. /// </summary> - internal class ClientFailoverSocket : IClientSocket + internal class ClientFailoverSocket : IDisposable { /** Underlying socket. */ private ClientSocket _socket; @@ -108,8 +108,11 @@ namespace Apache.Ignite.Core.Impl.Client Connect(); } - /** <inheritdoc /> */ - public T DoOutInOp<T>(ClientOp opId, Action<IBinaryStream> writeAction, Func<IBinaryStream, T> readFunc, + /// <summary> + /// Performs a send-receive operation. + /// </summary> + public T DoOutInOp<T>(ClientOp opId, Action<ClientRequestContext> writeAction, + Func<ClientResponseContext, T> readFunc, Func<ClientStatusCode, string, T> errorFunc = null) { return GetSocket().DoOutInOp(opId, writeAction, readFunc, errorFunc); @@ -120,8 +123,8 @@ namespace Apache.Ignite.Core.Impl.Client /// </summary> public T DoOutInOpAffinity<T, TKey>( ClientOp opId, - Action<IBinaryStream> writeAction, - Func<IBinaryStream, T> readFunc, + Action<ClientRequestContext> writeAction, + Func<ClientResponseContext, T> readFunc, int cacheId, TKey key, Func<ClientStatusCode, string, T> errorFunc = null) @@ -136,8 +139,8 @@ namespace Apache.Ignite.Core.Impl.Client /// </summary> public Task<T> DoOutInOpAffinityAsync<T, TKey>( ClientOp opId, - Action<IBinaryStream> writeAction, - Func<IBinaryStream, T> readFunc, + Action<ClientRequestContext> writeAction, + Func<ClientResponseContext, T> readFunc, int cacheId, TKey key, Func<ClientStatusCode, string, T> errorFunc = null) @@ -147,39 +150,45 @@ namespace Apache.Ignite.Core.Impl.Client return socket.DoOutInOpAsync(opId, writeAction, readFunc, errorFunc); } - /** <inheritdoc /> */ - public Task<T> DoOutInOpAsync<T>(ClientOp opId, Action<IBinaryStream> writeAction, Func<IBinaryStream, T> readFunc, Func<ClientStatusCode, string, T> errorFunc = null) + /// <summary> + /// Performs an async send-receive operation. + /// </summary> + public Task<T> DoOutInOpAsync<T>(ClientOp opId, Action<ClientRequestContext> writeAction, + Func<ClientResponseContext, T> readFunc, Func<ClientStatusCode, string, T> errorFunc = null) { return GetSocket().DoOutInOpAsync(opId, writeAction, readFunc, errorFunc); } - /** <inheritdoc /> */ - public ClientProtocolVersion ServerVersion + /// <summary> + /// Gets the current protocol version. + /// Only used for tests. + /// </summary> + public ClientProtocolVersion CurrentProtocolVersion { get { return GetSocket().ServerVersion; } } - /** <inheritdoc /> */ + /// <summary> + /// Gets the remote endpoint. + /// </summary> public EndPoint RemoteEndPoint { get { - lock (_syncRoot) - { - return _socket != null ? _socket.RemoteEndPoint : null; - } + var socket = _socket; + return socket != null ? socket.RemoteEndPoint : null; } } - /** <inheritdoc /> */ + /// <summary> + /// Gets the local endpoint. + /// </summary> public EndPoint LocalEndPoint { get { - lock (_syncRoot) - { - return _socket != null ? _socket.LocalEndPoint : null; - } + var socket = _socket; + return socket != null ? socket.LocalEndPoint : null; } } @@ -298,7 +307,7 @@ namespace Apache.Ignite.Core.Impl.Client try { _socket = new ClientSocket(_config, endPoint.EndPoint, endPoint.Host, - _config.ProtocolVersion, OnAffinityTopologyVersionChange); + _config.ProtocolVersion, OnAffinityTopologyVersionChange, _marsh); endPoint.Socket = _socket; @@ -412,8 +421,8 @@ namespace Apache.Ignite.Core.Impl.Client DoOutInOp( ClientOp.CachePartitions, - s => WriteDistributionMapRequest(cacheId, s), - s => ReadDistributionMapResponse(s)); + s => WriteDistributionMapRequest(cacheId, s.Stream), + s => ReadDistributionMapResponse(s.Stream)); } } @@ -521,7 +530,7 @@ namespace Apache.Ignite.Core.Impl.Client try { var socket = new ClientSocket(_config, endPoint.EndPoint, endPoint.Host, - _config.ProtocolVersion, OnAffinityTopologyVersionChange); + _config.ProtocolVersion, OnAffinityTopologyVersionChange, _marsh); endPoint.Socket = socket; } diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IClientSocket.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientRequestContext.cs similarity index 50% rename from modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IClientSocket.cs rename to modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientRequestContext.cs index c81f45f..d291136 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IClientSocket.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientRequestContext.cs @@ -17,42 +17,47 @@ namespace Apache.Ignite.Core.Impl.Client { - using System; - using System.Net; - using System.Threading.Tasks; - using Apache.Ignite.Core.Client; + using Apache.Ignite.Core.Impl.Binary; using Apache.Ignite.Core.Impl.Binary.IO; /// <summary> - /// Wrapper over framework socket for Ignite thin client operations. + /// Request context. /// </summary> - internal interface IClientSocket : IDisposable + internal sealed class ClientRequestContext : ClientContextBase { - /// <summary> - /// Performs a send-receive operation. - /// </summary> - T DoOutInOp<T>(ClientOp opId, Action<IBinaryStream> writeAction, - Func<IBinaryStream, T> readFunc, Func<ClientStatusCode, string, T> errorFunc = null); + /** */ + private BinaryWriter _writer; /// <summary> - /// Performs a send-receive operation asynchronously. + /// Initializes a new instance of <see cref="ClientRequestContext"/> class. /// </summary> - Task<T> DoOutInOpAsync<T>(ClientOp opId, Action<IBinaryStream> writeAction, - Func<IBinaryStream, T> readFunc, Func<ClientStatusCode, string, T> errorFunc = null); + /// <param name="stream">Stream.</param> + /// <param name="marshaller">Marshaller.</param> + /// <param name="protocolVersion">Protocol version to be used for this request.</param> + public ClientRequestContext(IBinaryStream stream, Marshaller marshaller, ClientProtocolVersion protocolVersion) + : base(stream, marshaller, protocolVersion) - /// <summary> - /// Gets the server version. - /// </summary> - ClientProtocolVersion ServerVersion { get; } + { + // No-op. + } /// <summary> - /// Gets the current remote EndPoint. + /// Writer. /// </summary> - EndPoint RemoteEndPoint { get; } + public BinaryWriter Writer + { + get { return _writer ?? (_writer = Marshaller.StartMarshal(Stream)); } + } /// <summary> - /// Gets the current local EndPoint. + /// Finishes marshal session for this request (if any). /// </summary> - EndPoint LocalEndPoint { get; } + public void FinishMarshal() + { + if (_writer != null) + { + Marshaller.FinishMarshal(_writer); + } + } } -} +} \ No newline at end of file diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientResponseContext.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientResponseContext.cs new file mode 100644 index 0000000..1ef1419 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientResponseContext.cs @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Client +{ + using Apache.Ignite.Core.Impl.Binary; + using Apache.Ignite.Core.Impl.Binary.IO; + + /// <summary> + /// Response context. + /// </summary> + internal sealed class ClientResponseContext : ClientContextBase + { + /** */ + private BinaryReader _reader; + + /// <summary> + /// Initializes a new instance of <see cref="ClientResponseContext"/> class. + /// </summary> + /// <param name="stream">Stream.</param> + /// <param name="marshaller">Marshaller.</param> + /// <param name="protocolVersion">Protocol version to be used for this response.</param> + public ClientResponseContext(IBinaryStream stream, Marshaller marshaller, ClientProtocolVersion protocolVersion) + : base(stream, marshaller, protocolVersion) + { + // No-op. + } + + /// <summary> + /// Reader. + /// </summary> + public BinaryReader Reader + { + get { return _reader ?? (_reader = Marshaller.StartUnmarshal(Stream)); } + } + } +} \ No newline at end of file diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs index 7d1272d..cb3ee65 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs @@ -38,7 +38,7 @@ namespace Apache.Ignite.Core.Impl.Client /// <summary> /// Wrapper over framework socket for Ignite thin client operations. /// </summary> - internal sealed class ClientSocket : IClientSocket + internal sealed class ClientSocket : IDisposable { /** Version 1.0.0. */ public static readonly ClientProtocolVersion Ver100 = new ClientProtocolVersion(1, 0, 0); @@ -118,6 +118,9 @@ namespace Apache.Ignite.Core.Impl.Client /** Logger. */ private readonly ILogger _logger; + /** Marshaller. */ + private readonly Marshaller _marsh; + /// <summary> /// Initializes a new instance of the <see cref="ClientSocket" /> class. /// </summary> @@ -126,13 +129,19 @@ namespace Apache.Ignite.Core.Impl.Client /// <param name="host">The host name (required for SSL).</param> /// <param name="version">Protocol version.</param> /// <param name="topVerCallback">Topology version update callback.</param> + /// <param name="marshaller">Marshaller.</param> public ClientSocket(IgniteClientConfiguration clientConfiguration, EndPoint endPoint, string host, - ClientProtocolVersion? version = null, - Action<AffinityTopologyVersion> topVerCallback = null) + ClientProtocolVersion? version, Action<AffinityTopologyVersion> topVerCallback, + Marshaller marshaller) { Debug.Assert(clientConfiguration != null); + Debug.Assert(endPoint != null); + Debug.Assert(!string.IsNullOrWhiteSpace(host)); + Debug.Assert(topVerCallback != null); + Debug.Assert(marshaller != null); _topVerCallback = topVerCallback; + _marsh = marshaller; _timeout = clientConfiguration.SocketTimeout; _logger = (clientConfiguration.Logger ?? NoopLogger.Instance).GetLogger(GetType()); @@ -185,8 +194,8 @@ namespace Apache.Ignite.Core.Impl.Client /// <summary> /// Performs a send-receive operation. /// </summary> - public T DoOutInOp<T>(ClientOp opId, Action<IBinaryStream> writeAction, - Func<IBinaryStream, T> readFunc, Func<ClientStatusCode, string, T> errorFunc = null) + public T DoOutInOp<T>(ClientOp opId, Action<ClientRequestContext> writeAction, + Func<ClientResponseContext, T> readFunc, Func<ClientStatusCode, string, T> errorFunc = null) { // Encode. var reqMsg = WriteMessage(writeAction, opId); @@ -201,8 +210,8 @@ namespace Apache.Ignite.Core.Impl.Client /// <summary> /// Performs a send-receive operation asynchronously. /// </summary> - public Task<T> DoOutInOpAsync<T>(ClientOp opId, Action<IBinaryStream> writeAction, - Func<IBinaryStream, T> readFunc, Func<ClientStatusCode, string, T> errorFunc = null) + public Task<T> DoOutInOpAsync<T>(ClientOp opId, Action<ClientRequestContext> writeAction, + Func<ClientResponseContext, T> readFunc, Func<ClientStatusCode, string, T> errorFunc = null) { // Encode. var reqMsg = WriteMessage(writeAction, opId); @@ -308,7 +317,7 @@ namespace Apache.Ignite.Core.Impl.Client /// <summary> /// Decodes the response that we got from <see cref="HandleResponse"/>. /// </summary> - private T DecodeResponse<T>(BinaryHeapStream stream, Func<IBinaryStream, T> readFunc, + private T DecodeResponse<T>(BinaryHeapStream stream, Func<ClientResponseContext, T> readFunc, Func<ClientStatusCode, string, T> errorFunc) { ClientStatusCode statusCode; @@ -337,7 +346,9 @@ namespace Apache.Ignite.Core.Impl.Client if (statusCode == ClientStatusCode.Success) { - return readFunc != null ? readFunc(stream) : default(T); + return readFunc != null + ? readFunc(new ClientResponseContext(stream, _marsh, ServerVersion)) + : default(T); } var msg = BinaryUtils.Marshaller.StartUnmarshal(stream).ReadString(); @@ -578,9 +589,9 @@ namespace Apache.Ignite.Core.Impl.Client /// <summary> /// Writes the message to a byte array. /// </summary> - private RequestMessage WriteMessage(Action<IBinaryStream> writeAction, ClientOp opId) + private RequestMessage WriteMessage(Action<ClientRequestContext> writeAction, ClientOp opId) { - ValidateOp(opId); + ClientUtils.ValidateOp(opId, ServerVersion); var requestId = Interlocked.Increment(ref _requestId); @@ -592,7 +603,14 @@ namespace Apache.Ignite.Core.Impl.Client stream.WriteInt(0); // Reserve message size. stream.WriteShort((short) opId); stream.WriteLong(requestId); - writeAction(stream); + + if (writeAction != null) + { + var ctx = new ClientRequestContext(stream, _marsh, ServerVersion); + writeAction(ctx); + ctx.FinishMarshal(); + } + stream.WriteInt(0, stream.Position - 4); // Write message size. return new RequestMessage(requestId, stream.GetArray(), stream.Position); @@ -766,26 +784,6 @@ namespace Apache.Ignite.Core.Impl.Client } } } - - /// <summary> - /// Validates op code against current protocol version. - /// </summary> - /// <param name="opId">Op code.</param> - private void ValidateOp(ClientOp opId) - { - var minVersion = opId.GetMinVersion(); - - if (ServerVersion >= minVersion) - { - return; - } - - var message = string.Format("Operation {0} is not supported by protocol version {1}. " + - "Minimum protocol version required is {2}.", - opId, ServerVersion, minVersion); - - throw new IgniteClientException(message); - } /// <summary> /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientUtils.cs new file mode 100644 index 0000000..e9bec08 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientUtils.cs @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Client +{ + using Apache.Ignite.Core.Client; + + /// <summary> + /// Client utils. + /// </summary> + internal static class ClientUtils + { + /// <summary> + /// Validates op code against current protocol version. + /// </summary> + /// <param name="operation">Operation.</param> + /// <param name="protocolVersion">Protocol version.</param> + public static void ValidateOp(ClientOp operation, ClientProtocolVersion protocolVersion) + { + ValidateOp(operation, protocolVersion, operation.GetMinVersion()); + } + + /// <summary> + /// Validates op code against current protocol version. + /// </summary> + /// <param name="operation">Operation.</param> + /// <param name="protocolVersion">Protocol version.</param> + /// <param name="requiredProtocolVersion">Required protocol version.</param> + public static void ValidateOp<T>(T operation, ClientProtocolVersion protocolVersion, + ClientProtocolVersion requiredProtocolVersion) + { + if (protocolVersion >= requiredProtocolVersion) + { + return; + } + + var message = string.Format("Operation {0} is not supported by protocol version {1}. " + + "Minimum protocol version required is {2}.", + operation, protocolVersion, requiredProtocolVersion); + + throw new IgniteClientException(message); + } + } +} \ No newline at end of file diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cluster/ClientCluster.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cluster/ClientCluster.cs index 4572a27..e0f7114 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cluster/ClientCluster.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cluster/ClientCluster.cs @@ -22,7 +22,6 @@ namespace Apache.Ignite.Core.Impl.Client.Cluster using System.Diagnostics; using Apache.Ignite.Core.Client; using Apache.Ignite.Core.Impl.Binary; - using Apache.Ignite.Core.Impl.Binary.IO; using Apache.Ignite.Core.Impl.Common; /// <summary> @@ -33,33 +32,27 @@ namespace Apache.Ignite.Core.Impl.Client.Cluster /** Ignite. */ private readonly IgniteClient _ignite; - /** Marshaller. */ - private readonly Marshaller _marsh; - /// <summary> /// Constructor. /// </summary> /// <param name="ignite">Ignite.</param> - /// <param name="marsh">Marshaller.</param> - public ClientCluster(IgniteClient ignite, Marshaller marsh) + public ClientCluster(IgniteClient ignite) { Debug.Assert(ignite != null); - Debug.Assert(marsh != null); _ignite = ignite; - _marsh = marsh; } /** <inheritdoc /> */ public void SetActive(bool isActive) { - DoOutInOp<object>(ClientOp.ClusterChangeState, w => w.WriteBoolean(isActive), null); + DoOutInOp<object>(ClientOp.ClusterChangeState, ctx => ctx.Stream.WriteBool(isActive), null); } /** <inheritdoc /> */ public bool IsActive() { - return DoOutInOp(ClientOp.ClusterIsActive, null, r => r.ReadBool()); + return DoOutInOp(ClientOp.ClusterIsActive, null, ctx => ctx.Stream.ReadBool()); } /** <inheritdoc /> */ @@ -67,12 +60,13 @@ namespace Apache.Ignite.Core.Impl.Client.Cluster { IgniteArgumentCheck.NotNullOrEmpty(cacheName, "cacheName"); - Action<BinaryWriter> action = w => + Action<ClientRequestContext> action = ctx => { - w.WriteString(cacheName); - w.WriteBoolean(false); + ctx.Writer.WriteString(cacheName); + ctx.Writer.WriteBoolean(false); }; - return DoOutInOp(ClientOp.ClusterChangeWalState, action, r => r.ReadBool()); + + return DoOutInOp(ClientOp.ClusterChangeWalState, action, ctx => ctx.Stream.ReadBool()); } /** <inheritdoc /> */ @@ -80,12 +74,13 @@ namespace Apache.Ignite.Core.Impl.Client.Cluster { IgniteArgumentCheck.NotNullOrEmpty(cacheName, "cacheName"); - Action<BinaryWriter> action = w => + Action<ClientRequestContext> action = ctx => { - w.WriteString(cacheName); - w.WriteBoolean(true); + ctx.Writer.WriteString(cacheName); + ctx.Writer.WriteBoolean(true); }; - return DoOutInOp(ClientOp.ClusterChangeWalState, action, r => r.ReadBool()); + + return DoOutInOp(ClientOp.ClusterChangeWalState, action, ctx => ctx.Stream.ReadBool()); } /** <inheritdoc /> */ @@ -93,33 +88,18 @@ namespace Apache.Ignite.Core.Impl.Client.Cluster { IgniteArgumentCheck.NotNullOrEmpty(cacheName, "cacheName"); - return DoOutInOp(ClientOp.ClusterGetWalState, w => w.WriteString(cacheName), r => r.ReadBool()); + return DoOutInOp(ClientOp.ClusterGetWalState, ctx => ctx.Writer.WriteString(cacheName), ctx => ctx.Stream.ReadBool()); } - + /// <summary> /// Does the out in op. /// </summary> - private T DoOutInOp<T>(ClientOp opId, Action<BinaryWriter> writeAction, Func<IBinaryStream, T> readFunc) + private T DoOutInOp<T>(ClientOp opId, Action<ClientRequestContext> writeAction, + Func<ClientResponseContext, T> readFunc) { - return _ignite.Socket.DoOutInOp(opId, stream => WriteRequest(writeAction, stream), - readFunc, HandleError<T>); + return _ignite.Socket.DoOutInOp(opId, writeAction, readFunc, HandleError<T>); } - - /// <summary> - /// Writes the request. - /// </summary> - private void WriteRequest(Action<BinaryWriter> writeAction, IBinaryStream stream) - { - if (writeAction != null) - { - var writer = _marsh.StartMarshal(stream); - - writeAction(writer); - - _marsh.FinishMarshal(writer); - } - } - + /// <summary> /// Handles the error. /// </summary> diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs index b47fc12..8704d59 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs @@ -27,7 +27,6 @@ namespace Apache.Ignite.Core.Impl.Client using Apache.Ignite.Core.Client.Cache; using Apache.Ignite.Core.Datastream; using Apache.Ignite.Core.Impl.Binary; - using Apache.Ignite.Core.Impl.Binary.IO; using Apache.Ignite.Core.Impl.Client.Cache; using Apache.Ignite.Core.Impl.Client.Cluster; using Apache.Ignite.Core.Impl.Cluster; @@ -106,7 +105,7 @@ namespace Apache.Ignite.Core.Impl.Client { IgniteArgumentCheck.NotNull(name, "name"); - DoOutOp(ClientOp.CacheGetOrCreateWithName, w => w.WriteString(name)); + DoOutOp(ClientOp.CacheGetOrCreateWithName, ctx => ctx.Writer.WriteString(name)); return GetCache<TK, TV>(name); } @@ -117,7 +116,7 @@ namespace Apache.Ignite.Core.Impl.Client IgniteArgumentCheck.NotNull(configuration, "configuration"); DoOutOp(ClientOp.CacheGetOrCreateWithConfiguration, - w => ClientCacheConfigurationSerializer.Write(w.Stream, configuration, ServerVersion)); + ctx => ClientCacheConfigurationSerializer.Write(ctx.Stream, configuration, ctx.ProtocolVersion)); return GetCache<TK, TV>(configuration.Name); } @@ -127,7 +126,7 @@ namespace Apache.Ignite.Core.Impl.Client { IgniteArgumentCheck.NotNull(name, "name"); - DoOutOp(ClientOp.CacheCreateWithName, w => w.WriteString(name)); + DoOutOp(ClientOp.CacheCreateWithName, ctx => ctx.Writer.WriteString(name)); return GetCache<TK, TV>(name); } @@ -138,7 +137,7 @@ namespace Apache.Ignite.Core.Impl.Client IgniteArgumentCheck.NotNull(configuration, "configuration"); DoOutOp(ClientOp.CacheCreateWithConfiguration, - w => ClientCacheConfigurationSerializer.Write(w.Stream, configuration, ServerVersion)); + ctx => ClientCacheConfigurationSerializer.Write(ctx.Stream, configuration, ctx.ProtocolVersion)); return GetCache<TK, TV>(configuration.Name); } @@ -146,13 +145,13 @@ namespace Apache.Ignite.Core.Impl.Client /** <inheritDoc /> */ public ICollection<string> GetCacheNames() { - return DoOutInOp(ClientOp.CacheGetNames, null, s => Marshaller.StartUnmarshal(s).ReadStringCollection()); + return DoOutInOp(ClientOp.CacheGetNames, null, ctx => ctx.Reader.ReadStringCollection()); } /** <inheritDoc /> */ public IClientCluster GetCluster() { - return new ClientCluster(this, _marsh); + return new ClientCluster(this); } /** <inheritDoc /> */ @@ -160,7 +159,7 @@ namespace Apache.Ignite.Core.Impl.Client { IgniteArgumentCheck.NotNull(name, "name"); - DoOutOp(ClientOp.CacheDestroy, w => w.WriteInt(BinaryUtils.GetCacheId(name))); + DoOutOp(ClientOp.CacheDestroy, ctx => ctx.Stream.WriteInt(BinaryUtils.GetCacheId(name))); } /** <inheritDoc /> */ @@ -243,14 +242,6 @@ namespace Apache.Ignite.Core.Impl.Client } /// <summary> - /// Gets the protocol version supported by server. - /// </summary> - public ClientProtocolVersion ServerVersion - { - get { return _socket.ServerVersion; } - } - - /// <summary> /// Gets the client not supported exception. /// </summary> public static NotSupportedException GetClientNotSupportedException(string info = null) @@ -268,26 +259,16 @@ namespace Apache.Ignite.Core.Impl.Client /// <summary> /// Does the out in op. /// </summary> - private T DoOutInOp<T>(ClientOp opId, Action<BinaryWriter> writeAction, - Func<IBinaryStream, T> readFunc) + private T DoOutInOp<T>(ClientOp opId, Action<ClientRequestContext> writeAction, + Func<ClientResponseContext, T> readFunc) { - return _socket.DoOutInOp(opId, stream => - { - if (writeAction != null) - { - var writer = _marsh.StartMarshal(stream); - - writeAction(writer); - - _marsh.FinishMarshal(writer); - } - }, readFunc); + return _socket.DoOutInOp(opId, writeAction, readFunc); } /// <summary> /// Does the out op. /// </summary> - private void DoOutOp(ClientOp opId, Action<BinaryWriter> writeAction = null) + private void DoOutOp(ClientOp opId, Action<ClientRequestContext> writeAction = null) { DoOutInOp<object>(opId, writeAction, null); } diff --git a/modules/platforms/dotnet/DEVNOTES.txt b/modules/platforms/dotnet/DEVNOTES.txt index 189f04e..80997ef 100644 --- a/modules/platforms/dotnet/DEVNOTES.txt +++ b/modules/platforms/dotnet/DEVNOTES.txt @@ -66,10 +66,11 @@ Requirements: * NuGet: sudo apt-get install nuget * JDK: sudo apt-get install default-jdk * Maven: sudo apt-get install maven +* PowerShell Core: https://github.com/PowerShell/PowerShell * IDE: Not required. Rider is recommended, MonoDevelop also works. Getting started: * Build Java and .NET: - ./build-mono.sh + pwsh ./build.ps1 -skipDotNetCore * Run tests: mono Apache.Ignite.Core.Tests/bin/Debug/Apache.Ignite.Core.Tests.exe -basicTests diff --git a/modules/platforms/dotnet/build.ps1 b/modules/platforms/dotnet/build.ps1 index d727042..14f417d 100644 --- a/modules/platforms/dotnet/build.ps1 +++ b/modules/platforms/dotnet/build.ps1 @@ -178,7 +178,7 @@ if ((Get-Command $ng -ErrorAction SilentlyContinue) -eq $null) { if (-not (Test-Path $ng)) { echo "Downloading NuGet..." - (New-Object System.Net.WebClient).DownloadFile("https://dist.nuget.org/win-x86-commandline/v3.3.0/nuget.exe", "nuget.exe") + (New-Object System.Net.WebClient).DownloadFile("https://dist.nuget.org/win-x86-commandline/v3.3.0/nuget.exe", "$PSScriptRoot\nuget.exe") } } diff --git a/modules/platforms/dotnet/release/Program.cs b/modules/platforms/dotnet/release/Program.cs index 66212e6..6c94df4 100644 --- a/modules/platforms/dotnet/release/Program.cs +++ b/modules/platforms/dotnet/release/Program.cs @@ -22,6 +22,7 @@ using System.Threading.Tasks; using Apache.Ignite.Core; using Apache.Ignite.Core.Cache.Configuration; using Apache.Ignite.Core.Client; +using Apache.Ignite.Core.Configuration; using Apache.Ignite.Core.Discovery.Tcp; using Apache.Ignite.Core.Discovery.Tcp.Static; using Apache.Ignite.Linq; @@ -47,7 +48,12 @@ namespace test_proj Endpoints = new[] {"127.0.0.1:47500"} }, SocketTimeout = TimeSpan.FromSeconds(0.3) - } + }, + ClientConnectorConfiguration = new ClientConnectorConfiguration + { + Port = 10842 + }, + Localhost = "127.0.0.1" }; using (var ignite = Ignition.Start(cfg)) @@ -67,7 +73,8 @@ namespace test_proj .Single(); Debug.Assert(1 == resPerson.Age); - using (var igniteThin = Ignition.StartClient(new IgniteClientConfiguration("127.0.0.1"))) + var clientCfg = new IgniteClientConfiguration("127.0.0.1:10842"); + using (var igniteThin = Ignition.StartClient(clientCfg)) { var cacheThin = igniteThin.GetCache<int, Person>(cacheCfg.Name); var personThin = await cacheThin.GetAsync(1);
