http://git-wip-us.apache.org/repos/asf/ignite/blob/1c09a923/modules/platforms/dotnet/Apache.Ignite.Core/Client/Cache/ICacheClient.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Client/Cache/ICacheClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Client/Cache/ICacheClient.cs index 47b780d..818a7f6 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Client/Cache/ICacheClient.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Client/Cache/ICacheClient.cs @@ -15,17 +15,18 @@ * limitations under the License. */ +// ReSharper disable UnusedParameter.Global namespace Apache.Ignite.Core.Client.Cache { using System; using System.Collections.Generic; + using System.Threading.Tasks; using Apache.Ignite.Core.Cache; using Apache.Ignite.Core.Cache.Query; /// <summary> /// Client cache API. See <see cref="IIgniteClient.GetCache{K, V}"/>. /// </summary> - // ReSharper disable once TypeParameterCanBeVariant (ICache shoul not be variant, more methods will be added) public interface ICacheClient<TK, TV> { /// <summary> @@ -44,6 +45,16 @@ namespace Apache.Ignite.Core.Client.Cache void Put(TK key, TV val); /// <summary> + /// Associates the specified value with the specified key in the cache. + /// <para /> + /// If the cache previously contained a mapping for the key, + /// the old value is replaced by the specified value. + /// </summary> + /// <param name="key">Key with which the specified value is to be associated.</param> + /// <param name="val">Value to be associated with the specified key.</param> + Task PutAsync(TK key, TV val); + + /// <summary> /// Retrieves value mapped to the specified key from cache. /// </summary> /// <param name="key">Key.</param> @@ -55,6 +66,14 @@ namespace Apache.Ignite.Core.Client.Cache /// Retrieves value mapped to the specified key from cache. /// </summary> /// <param name="key">Key.</param> + /// <returns>Value.</returns> + /// <exception cref="KeyNotFoundException">If the key is not present in the cache.</exception> + Task<TV> GetAsync(TK key); + + /// <summary> + /// Retrieves value mapped to the specified key from cache. + /// </summary> + /// <param name="key">Key.</param> /// <param name="value">When this method returns, the value associated with the specified key, /// if the key is found; otherwise, the default value for the type of the value parameter. /// This parameter is passed uninitialized.</param> @@ -64,6 +83,15 @@ namespace Apache.Ignite.Core.Client.Cache bool TryGet(TK key, out TV value); /// <summary> + /// Retrieves value mapped to the specified key from cache. + /// </summary> + /// <param name="key">Key.</param> + /// <returns> + /// <see cref="CacheResult{T}"/> containing a bool success flag and a value. + /// </returns> + Task<CacheResult<TV>> TryGetAsync(TK key); + + /// <summary> /// Retrieves values mapped to the specified keys from cache. /// </summary> /// <param name="keys">Keys.</param> @@ -71,6 +99,13 @@ namespace Apache.Ignite.Core.Client.Cache ICollection<ICacheEntry<TK, TV>> GetAll(IEnumerable<TK> keys); /// <summary> + /// Retrieves values mapped to the specified keys from cache. + /// </summary> + /// <param name="keys">Keys.</param> + /// <returns>Map of key-value pairs.</returns> + Task<ICollection<ICacheEntry<TK, TV>>> GetAllAsync(IEnumerable<TK> keys); + + /// <summary> /// Gets or sets a cache value with the specified key. /// Shortcut to <see cref="Get"/> and <see cref="Put"/> /// </summary> @@ -87,6 +122,13 @@ namespace Apache.Ignite.Core.Client.Cache bool ContainsKey(TK key); /// <summary> + /// Check if cache contains mapping for this key. + /// </summary> + /// <param name="key">Key.</param> + /// <returns>True if cache contains mapping for this key.</returns> + Task<bool> ContainsKeyAsync(TK key); + + /// <summary> /// Check if cache contains mapping for these keys. /// </summary> /// <param name="keys">Keys.</param> @@ -94,6 +136,13 @@ namespace Apache.Ignite.Core.Client.Cache bool ContainsKeys(IEnumerable<TK> keys); /// <summary> + /// Check if cache contains mapping for these keys. + /// </summary> + /// <param name="keys">Keys.</param> + /// <returns>True if cache contains mapping for all these keys.</returns> + Task<bool> ContainsKeysAsync(IEnumerable<TK> keys); + + /// <summary> /// Executes a Scan query. /// </summary> /// <param name="scanQuery">Scan query.</param> @@ -126,6 +175,17 @@ namespace Apache.Ignite.Core.Client.Cache CacheResult<TV> GetAndPut(TK key, TV val); /// <summary> + /// Associates the specified value with the specified key in this cache, + /// returning an existing value if one existed. + /// </summary> + /// <param name="key">Key with which the specified value is to be associated.</param> + /// <param name="val">Value to be associated with the specified key.</param> + /// <returns> + /// The value associated with the key at the start of the operation. + /// </returns> + Task<CacheResult<TV>> GetAndPutAsync(TK key, TV val); + + /// <summary> /// Atomically replaces the value for a given key if and only if there is a value currently mapped by the key. /// </summary> /// <param name="key">Key with which the specified value is to be associated.</param> @@ -136,6 +196,16 @@ namespace Apache.Ignite.Core.Client.Cache CacheResult<TV> GetAndReplace(TK key, TV val); /// <summary> + /// Atomically replaces the value for a given key if and only if there is a value currently mapped by the key. + /// </summary> + /// <param name="key">Key with which the specified value is to be associated.</param> + /// <param name="val">Value to be associated with the specified key.</param> + /// <returns> + /// The previous value associated with the specified key. + /// </returns> + Task<CacheResult<TV>> GetAndReplaceAsync(TK key, TV val); + + /// <summary> /// Atomically removes the entry for a key only if currently mapped to some value. /// </summary> /// <param name="key">Key with which the specified value is associated.</param> @@ -143,6 +213,13 @@ namespace Apache.Ignite.Core.Client.Cache CacheResult<TV> GetAndRemove(TK key); /// <summary> + /// Atomically removes the entry for a key only if currently mapped to some value. + /// </summary> + /// <param name="key">Key with which the specified value is associated.</param> + /// <returns>The value if one existed.</returns> + Task<CacheResult<TV>> GetAndRemoveAsync(TK key); + + /// <summary> /// Atomically associates the specified key with the given value if it is not already associated with a value. /// </summary> /// <param name="key">Key with which the specified value is to be associated.</param> @@ -151,6 +228,14 @@ namespace Apache.Ignite.Core.Client.Cache bool PutIfAbsent(TK key, TV val); /// <summary> + /// Atomically associates the specified key with the given value if it is not already associated with a value. + /// </summary> + /// <param name="key">Key with which the specified value is to be associated.</param> + /// <param name="val">Value to be associated with the specified key.</param> + /// <returns>True if a value was set.</returns> + Task<bool> PutIfAbsentAsync(TK key, TV val); + + /// <summary> /// Stores given key-value pair in cache only if cache had no previous mapping for it. /// </summary> /// <param name="key">Key to store in cache.</param> @@ -161,6 +246,16 @@ namespace Apache.Ignite.Core.Client.Cache CacheResult<TV> GetAndPutIfAbsent(TK key, TV val); /// <summary> + /// Stores given key-value pair in cache only if cache had no previous mapping for it. + /// </summary> + /// <param name="key">Key to store in cache.</param> + /// <param name="val">Value to be associated with the given key.</param> + /// <returns> + /// Previously contained value regardless of whether put happened or not. + /// </returns> + Task<CacheResult<TV>> GetAndPutIfAbsentAsync(TK key, TV val); + + /// <summary> /// Stores given key-value pair in cache only if there is a previous mapping for it. /// </summary> /// <param name="key">Key to store in cache.</param> @@ -169,6 +264,14 @@ namespace Apache.Ignite.Core.Client.Cache bool Replace(TK key, TV val); /// <summary> + /// Stores given key-value pair in cache only if there is a previous mapping for it. + /// </summary> + /// <param name="key">Key to store in cache.</param> + /// <param name="val">Value to be associated with the given key.</param> + /// <returns>True if the value was replaced.</returns> + Task<bool> ReplaceAsync(TK key, TV val); + + /// <summary> /// Stores given key-value pair in cache only if only if the previous value is equal to the /// old value passed as argument. /// </summary> @@ -179,29 +282,62 @@ namespace Apache.Ignite.Core.Client.Cache bool Replace(TK key, TV oldVal, TV newVal); /// <summary> + /// Stores given key-value pair in cache only if only if the previous value is equal to the + /// old value passed as argument. + /// </summary> + /// <param name="key">Key to store in cache.</param> + /// <param name="oldVal">Old value to match.</param> + /// <param name="newVal">Value to be associated with the given key.</param> + /// <returns>True if replace happened, false otherwise.</returns> + Task<bool> ReplaceAsync(TK key, TV oldVal, TV newVal); + + /// <summary> /// Stores given key-value pairs in cache. /// </summary> /// <param name="vals">Key-value pairs to store in cache.</param> void PutAll(IEnumerable<KeyValuePair<TK, TV>> vals); /// <summary> + /// Stores given key-value pairs in cache. + /// </summary> + /// <param name="vals">Key-value pairs to store in cache.</param> + Task PutAllAsync(IEnumerable<KeyValuePair<TK, TV>> vals); + + /// <summary> /// Clears the contents of the cache, without notifying listeners or CacheWriters. /// </summary> void Clear(); /// <summary> + /// Clears the contents of the cache, without notifying listeners or CacheWriters. + /// </summary> + Task ClearAsync(); + + /// <summary> /// Clear entry from the cache, without notifying listeners or CacheWriters. /// </summary> /// <param name="key">Key to clear.</param> void Clear(TK key); /// <summary> + /// Clear entry from the cache, without notifying listeners or CacheWriters. + /// </summary> + /// <param name="key">Key to clear.</param> + Task ClearAsync(TK key); + + /// <summary> /// Clear entries from the cache, without notifying listeners or CacheWriters. /// </summary> /// <param name="keys">Keys to clear.</param> void ClearAll(IEnumerable<TK> keys); /// <summary> + /// Clear entries from the cache, without notifying listeners or CacheWriters. + /// </summary> + /// <param name="keys">Keys to clear.</param> + Task ClearAllAsync(IEnumerable<TK> keys); + + /// <summary> /// Removes given key mapping from cache, notifying listeners and cache writers. /// </summary> /// <param name="key">Key to remove.</param> @@ -209,6 +345,13 @@ namespace Apache.Ignite.Core.Client.Cache bool Remove(TK key); /// <summary> + /// Removes given key mapping from cache, notifying listeners and cache writers. + /// </summary> + /// <param name="key">Key to remove.</param> + /// <returns>True if entry was removed, false otherwise.</returns> + Task<bool> RemoveAsync(TK key); + + /// <summary> /// Removes given key mapping from cache if one exists and value is equal to the passed in value. /// </summary> /// <param name="key">Key whose mapping is to be removed from cache.</param> @@ -217,17 +360,36 @@ namespace Apache.Ignite.Core.Client.Cache bool Remove(TK key, TV val); /// <summary> + /// Removes given key mapping from cache if one exists and value is equal to the passed in value. + /// </summary> + /// <param name="key">Key whose mapping is to be removed from cache.</param> + /// <param name="val">Value to match against currently cached value.</param> + /// <returns>True if entry was removed, false otherwise.</returns> + Task<bool> RemoveAsync(TK key, TV val); + + /// <summary> /// Removes given key mappings from cache, notifying listeners and cache writers. /// </summary> /// <param name="keys">Keys to be removed from cache.</param> void RemoveAll(IEnumerable<TK> keys); /// <summary> + /// Removes given key mappings from cache, notifying listeners and cache writers. + /// </summary> + /// <param name="keys">Keys to be removed from cache.</param> + Task RemoveAllAsync(IEnumerable<TK> keys); + + /// <summary> /// Removes all mappings from cache, notifying listeners and cache writers. /// </summary> void RemoveAll(); /// <summary> + /// Removes all mappings from cache, notifying listeners and cache writers. + /// </summary> + Task RemoveAllAsync(); + + /// <summary> /// Gets the number of all entries cached across all nodes. /// <para /> /// NOTE: this operation is distributed and will query all participating nodes for their cache sizes. @@ -237,6 +399,15 @@ namespace Apache.Ignite.Core.Client.Cache long GetSize(params CachePeekMode[] modes); /// <summary> + /// Gets the number of all entries cached across all nodes. + /// <para /> + /// NOTE: this operation is distributed and will query all participating nodes for their cache sizes. + /// </summary> + /// <param name="modes">Optional peek modes. If not provided, then total cache size is returned.</param> + /// <returns>Cache size across all nodes.</returns> + Task<long> GetSizeAsync(params CachePeekMode[] modes); + + /// <summary> /// Gets the cache configuration. /// </summary> CacheClientConfiguration GetConfiguration();
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c09a923/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs index 58f12fe..2b24aa4 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs @@ -27,6 +27,8 @@ namespace Apache.Ignite.Core.Client /// Main entry point for Ignite Thin Client APIs. /// You can obtain an instance of <see cref="IIgniteClient"/> through one of the /// <see cref="Ignition.StartClient()"/> overloads. + /// <para /> + /// Instances of this class and all nested APIs are thread safe. /// </summary> public interface IIgniteClient : IDisposable { http://git-wip-us.apache.org/repos/asf/ignite/blob/1c09a923/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs index e46ede4..e20666f 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs @@ -17,6 +17,7 @@ namespace Apache.Ignite.Core.Client { + using System; using System.ComponentModel; using System.Xml; using Apache.Ignite.Core.Binary; @@ -48,6 +49,11 @@ namespace Apache.Ignite.Core.Client public const bool DefaultTcpNoDelay = true; /// <summary> + /// Default socket timeout. + /// </summary> + public static readonly TimeSpan DefaultSocketTimeout = TimeSpan.FromMilliseconds(5000); + + /// <summary> /// Initializes a new instance of the <see cref="IgniteClientConfiguration"/> class. /// </summary> public IgniteClientConfiguration() @@ -56,6 +62,7 @@ namespace Apache.Ignite.Core.Client SocketSendBufferSize = DefaultSocketBufferSize; SocketReceiveBufferSize = DefaultSocketBufferSize; TcpNoDelay = DefaultTcpNoDelay; + SocketTimeout = DefaultSocketTimeout; } /// <summary> @@ -74,6 +81,7 @@ namespace Apache.Ignite.Core.Client SocketSendBufferSize = cfg.SocketSendBufferSize; SocketReceiveBufferSize = cfg.SocketReceiveBufferSize; TcpNoDelay = cfg.TcpNoDelay; + SocketTimeout = cfg.SocketTimeout; if (cfg.BinaryConfiguration != null) { @@ -107,6 +115,12 @@ namespace Apache.Ignite.Core.Client public int SocketReceiveBufferSize { get; set; } /// <summary> + /// Gets or sets the socket operation timeout. Zero or negative means infinite timeout. + /// </summary> + [DefaultValue(typeof(TimeSpan), "00:00:05")] + public TimeSpan SocketTimeout { get; set; } + + /// <summary> /// Gets or sets the value for <c>TCP_NODELAY</c> socket option. Each /// socket will be opened using provided value. /// <para /> http://git-wip-us.apache.org/repos/asf/ignite/blob/1c09a923/modules/platforms/dotnet/Apache.Ignite.Core/IgniteClientConfigurationSection.xsd ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteClientConfigurationSection.xsd b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteClientConfigurationSection.xsd index e7a6889..f71ce0b 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteClientConfigurationSection.xsd +++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteClientConfigurationSection.xsd @@ -198,6 +198,11 @@ <xs:documentation>Value for TCP_NODELAY socket option.</xs:documentation> </xs:annotation> </xs:attribute> + <xs:attribute name="socketTimeout" type="xs:string"> + <xs:annotation> + <xs:documentation>Socket operation timeout. Zero or negative for infinite timeout.</xs:documentation> + </xs:annotation> + </xs:attribute> </xs:complexType> </xs:element> </xs:schema> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/1c09a923/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs index a6082f1..6bd03fd 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs @@ -356,7 +356,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO /// </summary> /// <param name="data">Data pointer.</param> /// <returns>Int value</returns> - private static int ReadInt0(byte* data) { + public static int ReadInt0(byte* data) { int val; if (LittleEndian) http://git-wip-us.apache.org/repos/asf/ignite/blob/1c09a923/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs index 2344417..8138b77 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -22,6 +22,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.IO; + using System.Threading.Tasks; using Apache.Ignite.Core.Binary; using Apache.Ignite.Core.Cache; using Apache.Ignite.Core.Cache.Configuration; @@ -99,6 +100,14 @@ namespace Apache.Ignite.Core.Impl.Client.Cache } /** <inheritDoc /> */ + public Task<TV> GetAsync(TK key) + { + IgniteArgumentCheck.NotNull(key, "key"); + + return DoOutInOpAsync(ClientOp.CacheGet, w => w.WriteObject(key), UnmarshalNotNull<TV>); + } + + /** <inheritDoc /> */ public bool TryGet(TK key, out TV value) { IgniteArgumentCheck.NotNull(key, "key"); @@ -111,24 +120,27 @@ namespace Apache.Ignite.Core.Impl.Client.Cache } /** <inheritDoc /> */ + public Task<CacheResult<TV>> TryGetAsync(TK key) + { + IgniteArgumentCheck.NotNull(key, "key"); + + return DoOutInOpAsync(ClientOp.CacheGet, w => w.WriteObject(key), UnmarshalCacheResult<TV>); + } + + /** <inheritDoc /> */ public ICollection<ICacheEntry<TK, TV>> GetAll(IEnumerable<TK> keys) { IgniteArgumentCheck.NotNull(keys, "keys"); - return DoOutInOp(ClientOp.CacheGetAll, w => w.WriteEnumerable(keys), stream => - { - var reader = _marsh.StartUnmarshal(stream, _keepBinary); - - var cnt = reader.ReadInt(); - var res = new List<ICacheEntry<TK, TV>>(cnt); + return DoOutInOp(ClientOp.CacheGetAll, w => w.WriteEnumerable(keys), s => ReadCacheEntries(s)); + } - for (var i = 0; i < cnt; i++) - { - res.Add(new CacheEntry<TK, TV>(reader.ReadObject<TK>(), reader.ReadObject<TV>())); - } + /** <inheritDoc /> */ + public Task<ICollection<ICacheEntry<TK, TV>>> GetAllAsync(IEnumerable<TK> keys) + { + IgniteArgumentCheck.NotNull(keys, "keys"); - return res; - }); + return DoOutInOpAsync(ClientOp.CacheGetAll, w => w.WriteEnumerable(keys), s => ReadCacheEntries(s)); } /** <inheritDoc /> */ @@ -137,11 +149,16 @@ namespace Apache.Ignite.Core.Impl.Client.Cache IgniteArgumentCheck.NotNull(key, "key"); IgniteArgumentCheck.NotNull(val, "val"); - DoOutOp(ClientOp.CachePut, w => - { - w.WriteObjectDetached(key); - w.WriteObjectDetached(val); - }); + DoOutOp(ClientOp.CachePut, w => WriteKeyVal(w, key, val)); + } + + /** <inheritDoc /> */ + public Task PutAsync(TK key, TV val) + { + IgniteArgumentCheck.NotNull(key, "key"); + IgniteArgumentCheck.NotNull(val, "val"); + + return DoOutOpAsync(ClientOp.CachePut, w => WriteKeyVal(w, key, val)); } /** <inheritDoc /> */ @@ -153,6 +170,14 @@ namespace Apache.Ignite.Core.Impl.Client.Cache } /** <inheritDoc /> */ + public Task<bool> ContainsKeyAsync(TK key) + { + IgniteArgumentCheck.NotNull(key, "key"); + + return DoOutInOpAsync(ClientOp.CacheContainsKey, w => w.WriteObjectDetached(key), r => r.ReadBool()); + } + + /** <inheritDoc /> */ public bool ContainsKeys(IEnumerable<TK> keys) { IgniteArgumentCheck.NotNull(keys, "keys"); @@ -161,6 +186,14 @@ namespace Apache.Ignite.Core.Impl.Client.Cache } /** <inheritDoc /> */ + public Task<bool> ContainsKeysAsync(IEnumerable<TK> keys) + { + IgniteArgumentCheck.NotNull(keys, "keys"); + + return DoOutInOpAsync(ClientOp.CacheContainsKeys, w => w.WriteEnumerable(keys), r => r.ReadBool()); + } + + /** <inheritDoc /> */ public IQueryCursor<ICacheEntry<TK, TV>> Query(ScanQuery<TK, TV> scanQuery) { IgniteArgumentCheck.NotNull(scanQuery, "scanQuery"); @@ -209,11 +242,16 @@ namespace Apache.Ignite.Core.Impl.Client.Cache IgniteArgumentCheck.NotNull(key, "key"); IgniteArgumentCheck.NotNull(val, "val"); - return DoOutInOp(ClientOp.CacheGetAndPut, w => - { - w.WriteObjectDetached(key); - w.WriteObjectDetached(val); - }, UnmarshalCacheResult<TV>); + return DoOutInOp(ClientOp.CacheGetAndPut, w => WriteKeyVal(w, key, val), UnmarshalCacheResult<TV>); + } + + /** <inheritDoc /> */ + public Task<CacheResult<TV>> GetAndPutAsync(TK key, TV val) + { + IgniteArgumentCheck.NotNull(key, "key"); + IgniteArgumentCheck.NotNull(val, "val"); + + return DoOutInOpAsync(ClientOp.CacheGetAndPut, w => WriteKeyVal(w, key, val), UnmarshalCacheResult<TV>); } /** <inheritDoc /> */ @@ -222,11 +260,16 @@ namespace Apache.Ignite.Core.Impl.Client.Cache IgniteArgumentCheck.NotNull(key, "key"); IgniteArgumentCheck.NotNull(val, "val"); - return DoOutInOp(ClientOp.CacheGetAndReplace, w => - { - w.WriteObjectDetached(key); - w.WriteObjectDetached(val); - }, UnmarshalCacheResult<TV>); + return DoOutInOp(ClientOp.CacheGetAndReplace, w => WriteKeyVal(w, key, val), UnmarshalCacheResult<TV>); + } + + /** <inheritDoc /> */ + public Task<CacheResult<TV>> GetAndReplaceAsync(TK key, TV val) + { + IgniteArgumentCheck.NotNull(key, "key"); + IgniteArgumentCheck.NotNull(val, "val"); + + return DoOutInOpAsync(ClientOp.CacheGetAndReplace, w => WriteKeyVal(w, key, val), UnmarshalCacheResult<TV>); } /** <inheritDoc /> */ @@ -239,16 +282,30 @@ namespace Apache.Ignite.Core.Impl.Client.Cache } /** <inheritDoc /> */ + public Task<CacheResult<TV>> GetAndRemoveAsync(TK key) + { + IgniteArgumentCheck.NotNull(key, "key"); + + return DoOutInOpAsync(ClientOp.CacheGetAndRemove, w => w.WriteObjectDetached(key), + UnmarshalCacheResult<TV>); + } + + /** <inheritDoc /> */ public bool PutIfAbsent(TK key, TV val) { IgniteArgumentCheck.NotNull(key, "key"); IgniteArgumentCheck.NotNull(val, "val"); - return DoOutInOp(ClientOp.CachePutIfAbsent, w => - { - w.WriteObjectDetached(key); - w.WriteObjectDetached(val); - }, s => s.ReadBool()); + return DoOutInOp(ClientOp.CachePutIfAbsent, w => WriteKeyVal(w, key, val), s => s.ReadBool()); + } + + /** <inheritDoc /> */ + public Task<bool> PutIfAbsentAsync(TK key, TV val) + { + IgniteArgumentCheck.NotNull(key, "key"); + IgniteArgumentCheck.NotNull(val, "val"); + + return DoOutInOpAsync(ClientOp.CachePutIfAbsent, w => WriteKeyVal(w, key, val), s => s.ReadBool()); } /** <inheritDoc /> */ @@ -257,11 +314,18 @@ namespace Apache.Ignite.Core.Impl.Client.Cache IgniteArgumentCheck.NotNull(key, "key"); IgniteArgumentCheck.NotNull(val, "val"); - return DoOutInOp(ClientOp.CacheGetAndPutIfAbsent, w => - { - w.WriteObjectDetached(key); - w.WriteObjectDetached(val); - }, UnmarshalCacheResult<TV>); + return DoOutInOp(ClientOp.CacheGetAndPutIfAbsent, w => WriteKeyVal(w, key, val), + UnmarshalCacheResult<TV>); + } + + /** <inheritDoc /> */ + public Task<CacheResult<TV>> GetAndPutIfAbsentAsync(TK key, TV val) + { + IgniteArgumentCheck.NotNull(key, "key"); + IgniteArgumentCheck.NotNull(val, "val"); + + return DoOutInOpAsync(ClientOp.CacheGetAndPutIfAbsent, w => WriteKeyVal(w, key, val), + UnmarshalCacheResult<TV>); } /** <inheritDoc /> */ @@ -270,21 +334,41 @@ namespace Apache.Ignite.Core.Impl.Client.Cache IgniteArgumentCheck.NotNull(key, "key"); IgniteArgumentCheck.NotNull(val, "val"); - return DoOutInOp(ClientOp.CacheReplace, w => + return DoOutInOp(ClientOp.CacheReplace, w => WriteKeyVal(w, key, val), s => s.ReadBool()); + } + + /** <inheritDoc /> */ + public Task<bool> ReplaceAsync(TK key, TV val) + { + IgniteArgumentCheck.NotNull(key, "key"); + IgniteArgumentCheck.NotNull(val, "val"); + + return DoOutInOpAsync(ClientOp.CacheReplace, w => WriteKeyVal(w, key, val), s => s.ReadBool()); + } + + /** <inheritDoc /> */ + public bool Replace(TK key, TV oldVal, TV newVal) + { + IgniteArgumentCheck.NotNull(key, "key"); + IgniteArgumentCheck.NotNull(oldVal, "oldVal"); + IgniteArgumentCheck.NotNull(newVal, "newVal"); + + return DoOutInOp(ClientOp.CacheReplaceIfEquals, w => { w.WriteObjectDetached(key); - w.WriteObjectDetached(val); + w.WriteObjectDetached(oldVal); + w.WriteObjectDetached(newVal); }, s => s.ReadBool()); } /** <inheritDoc /> */ - public bool Replace(TK key, TV oldVal, TV newVal) + public Task<bool> ReplaceAsync(TK key, TV oldVal, TV newVal) { IgniteArgumentCheck.NotNull(key, "key"); IgniteArgumentCheck.NotNull(oldVal, "oldVal"); IgniteArgumentCheck.NotNull(newVal, "newVal"); - return DoOutInOp(ClientOp.CacheReplaceIfEquals, w => + return DoOutInOpAsync(ClientOp.CacheReplaceIfEquals, w => { w.WriteObjectDetached(key); w.WriteObjectDetached(oldVal); @@ -301,12 +385,26 @@ namespace Apache.Ignite.Core.Impl.Client.Cache } /** <inheritDoc /> */ + public Task PutAllAsync(IEnumerable<KeyValuePair<TK, TV>> vals) + { + IgniteArgumentCheck.NotNull(vals, "vals"); + + return DoOutOpAsync(ClientOp.CachePutAll, w => w.WriteDictionary(vals)); + } + + /** <inheritDoc /> */ public void Clear() { DoOutOp(ClientOp.CacheClear); } /** <inheritDoc /> */ + public Task ClearAsync() + { + return DoOutOpAsync(ClientOp.CacheClear); + } + + /** <inheritDoc /> */ public void Clear(TK key) { IgniteArgumentCheck.NotNull(key, "key"); @@ -315,6 +413,14 @@ namespace Apache.Ignite.Core.Impl.Client.Cache } /** <inheritDoc /> */ + public Task ClearAsync(TK key) + { + IgniteArgumentCheck.NotNull(key, "key"); + + return DoOutOpAsync(ClientOp.CacheClearKey, w => w.WriteObjectDetached(key)); + } + + /** <inheritDoc /> */ public void ClearAll(IEnumerable<TK> keys) { IgniteArgumentCheck.NotNull(keys, "keys"); @@ -323,6 +429,14 @@ namespace Apache.Ignite.Core.Impl.Client.Cache } /** <inheritDoc /> */ + public Task ClearAllAsync(IEnumerable<TK> keys) + { + IgniteArgumentCheck.NotNull(keys, "keys"); + + return DoOutOpAsync(ClientOp.CacheClearKeys, w => w.WriteEnumerable(keys)); + } + + /** <inheritDoc /> */ public bool Remove(TK key) { IgniteArgumentCheck.NotNull(key, "key"); @@ -331,16 +445,29 @@ namespace Apache.Ignite.Core.Impl.Client.Cache } /** <inheritDoc /> */ + public Task<bool> RemoveAsync(TK key) + { + IgniteArgumentCheck.NotNull(key, "key"); + + return DoOutInOpAsync(ClientOp.CacheRemoveKey, w => w.WriteObjectDetached(key), r => r.ReadBool()); + } + + /** <inheritDoc /> */ public bool Remove(TK key, TV val) { IgniteArgumentCheck.NotNull(key, "key"); IgniteArgumentCheck.NotNull(val, "val"); - return DoOutInOp(ClientOp.CacheRemoveIfEquals, w => - { - w.WriteObjectDetached(key); - w.WriteObjectDetached(val); - }, r => r.ReadBool()); + return DoOutInOp(ClientOp.CacheRemoveIfEquals, w => WriteKeyVal(w, key, val), r => r.ReadBool()); + } + + /** <inheritDoc /> */ + public Task<bool> RemoveAsync(TK key, TV val) + { + IgniteArgumentCheck.NotNull(key, "key"); + IgniteArgumentCheck.NotNull(val, "val"); + + return DoOutInOpAsync(ClientOp.CacheRemoveIfEquals, w => WriteKeyVal(w, key, val), r => r.ReadBool()); } /** <inheritDoc /> */ @@ -352,18 +479,38 @@ namespace Apache.Ignite.Core.Impl.Client.Cache } /** <inheritDoc /> */ + public Task RemoveAllAsync(IEnumerable<TK> keys) + { + IgniteArgumentCheck.NotNull(keys, "keys"); + + return DoOutOpAsync(ClientOp.CacheRemoveKeys, w => w.WriteEnumerable(keys)); + } + + /** <inheritDoc /> */ public void RemoveAll() { DoOutOp(ClientOp.CacheRemoveAll); } /** <inheritDoc /> */ + public Task RemoveAllAsync() + { + return DoOutOpAsync(ClientOp.CacheRemoveAll); + } + + /** <inheritDoc /> */ public long GetSize(params CachePeekMode[] modes) { return DoOutInOp(ClientOp.CacheGetSize, w => WritePeekModes(modes, w), s => s.ReadLong()); } /** <inheritDoc /> */ + public Task<long> GetSizeAsync(params CachePeekMode[] modes) + { + return DoOutInOpAsync(ClientOp.CacheGetSize, w => WritePeekModes(modes, w), s => s.ReadLong()); + } + + /** <inheritDoc /> */ public CacheClientConfiguration GetConfiguration() { return DoOutInOp(ClientOp.CacheGetConfiguration, null, s => new CacheClientConfiguration(s)); @@ -405,33 +552,57 @@ namespace Apache.Ignite.Core.Impl.Client.Cache } /// <summary> + /// Does the out op. + /// </summary> + private void DoOutOp(ClientOp opId, Action<BinaryWriter> writeAction = null) + { + DoOutInOp<object>(opId, writeAction, null); + } + + /// <summary> + /// Does the out op. + /// </summary> + private Task DoOutOpAsync(ClientOp opId, Action<BinaryWriter> writeAction = null) + { + return DoOutInOpAsync<object>(opId, writeAction, null); + } + + /// <summary> /// Does the out in op. /// </summary> private T DoOutInOp<T>(ClientOp opId, Action<BinaryWriter> writeAction, Func<IBinaryStream, T> readFunc) { - return _ignite.Socket.DoOutInOp(opId, stream => - { - stream.WriteInt(_id); - stream.WriteByte(0); // Flags (skipStore, etc). - - if (writeAction != null) - { - var writer = _marsh.StartMarshal(stream); - - writeAction(writer); + return _ignite.Socket.DoOutInOp(opId, stream => WriteRequest(writeAction, stream), + readFunc, HandleError<T>); + } - _marsh.FinishMarshal(writer); - } - }, readFunc, HandleError<T>); + /// <summary> + /// Does the out in op. + /// </summary> + private Task<T> DoOutInOpAsync<T>(ClientOp opId, Action<BinaryWriter> writeAction, + Func<IBinaryStream, T> readFunc) + { + return _ignite.Socket.DoOutInOpAsync(opId, stream => WriteRequest(writeAction, stream), + readFunc, HandleError<T>); } /// <summary> - /// Does the out op. + /// Writes the request. /// </summary> - private void DoOutOp(ClientOp opId, Action<BinaryWriter> writeAction = null) + private void WriteRequest(Action<BinaryWriter> writeAction, IBinaryStream stream) { - DoOutInOp<object>(opId, writeAction, null); + stream.WriteInt(_id); + stream.WriteByte(0); // Flags (skipStore, etc). + + if (writeAction != null) + { + var writer = _marsh.StartMarshal(stream); + + writeAction(writer); + + _marsh.FinishMarshal(writer); + } } /// <summary> @@ -618,5 +789,32 @@ namespace Apache.Ignite.Core.Impl.Client.Cache } } } + + /// <summary> + /// Reads the cache entries. + /// </summary> + private ICollection<ICacheEntry<TK, TV>> ReadCacheEntries(IBinaryStream stream) + { + var reader = _marsh.StartUnmarshal(stream, _keepBinary); + + var cnt = reader.ReadInt(); + var res = new List<ICacheEntry<TK, TV>>(cnt); + + for (var i = 0; i < cnt; i++) + { + res.Add(new CacheEntry<TK, TV>(reader.ReadObject<TK>(), reader.ReadObject<TV>())); + } + + return res; + } + + /// <summary> + /// Writes key and value. + /// </summary> + private static void WriteKeyVal(BinaryWriter w, TK key, TV val) + { + w.WriteObjectDetached(key); + w.WriteObjectDetached(val); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1c09a923/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs index b8218c1..8e19df5 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs @@ -18,6 +18,7 @@ namespace Apache.Ignite.Core.Impl.Client { using System; + using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; @@ -25,6 +26,7 @@ namespace Apache.Ignite.Core.Impl.Client using System.Net; using System.Net.Sockets; using System.Threading; + using System.Threading.Tasks; using Apache.Ignite.Core.Client; using Apache.Ignite.Core.Common; using Apache.Ignite.Core.Impl.Binary; @@ -33,7 +35,7 @@ namespace Apache.Ignite.Core.Impl.Client /// <summary> /// Wrapper over framework socket for Ignite thin client operations. /// </summary> - internal class ClientSocket : IDisposable + internal sealed class ClientSocket : IDisposable { /** Current version. */ private static readonly ClientProtocolVersion CurrentProtocolVersion = new ClientProtocolVersion(1, 0, 0); @@ -44,12 +46,41 @@ namespace Apache.Ignite.Core.Impl.Client /** Client type code. */ private const byte ClientType = 2; - /** Unerlying socket. */ + /** Underlying socket. */ private readonly Socket _socket; - /** */ + /** Operation timeout. */ + private readonly TimeSpan _timeout; + + /** Request timeout checker. */ + private readonly Timer _timeoutCheckTimer; + + /** Callback checker guard. */ + private volatile bool _checkingTimeouts; + + /** Current async operations, map from request id. */ + private readonly ConcurrentDictionary<long, Request> _requests + = new ConcurrentDictionary<long, Request>(); + + /** Request id generator. */ private long _requestId; + /** Socket failure exception. */ + private volatile Exception _exception; + + /** Locker. */ + private readonly ReaderWriterLockSlim _sendRequestLock = + new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion); + + /** Background socket receiver trigger. */ + private readonly ManualResetEventSlim _listenerEvent = new ManualResetEventSlim(); + + /** Dispose locker. */ + private readonly object _disposeSyncRoot = new object(); + + /** Disposed flag. */ + private bool _isDisposed; + /// <summary> /// Initializes a new instance of the <see cref="ClientSocket" /> class. /// </summary> @@ -59,9 +90,21 @@ namespace Apache.Ignite.Core.Impl.Client { Debug.Assert(clientConfiguration != null); + _timeout = clientConfiguration.SocketTimeout; + _socket = Connect(clientConfiguration); - Handshake(_socket, version ?? CurrentProtocolVersion); + Handshake(version ?? CurrentProtocolVersion); + + // Check periodically if any request has timed out. + if (_timeout > TimeSpan.Zero) + { + // Minimum Socket timeout is 500ms. + _timeoutCheckTimer = new Timer(CheckTimeouts, null, _timeout, TimeSpan.FromMilliseconds(500)); + } + + // Continuously and asynchronously wait for data from server. + Task.Factory.StartNew(WaitForMessages); } /// <summary> @@ -70,48 +113,120 @@ namespace Apache.Ignite.Core.Impl.Client public T DoOutInOp<T>(ClientOp opId, Action<IBinaryStream> writeAction, Func<IBinaryStream, T> readFunc, Func<ClientStatusCode, string, T> errorFunc = null) { - var requestId = Interlocked.Increment(ref _requestId); + // Encode. + var reqMsg = WriteMessage(writeAction, opId); + + // Send. + var response = SendRequest(ref reqMsg); + + // Decode. + return DecodeResponse(response, readFunc, errorFunc); + } - var resBytes = SendReceive(_socket, stream => - { - stream.WriteShort((short) opId); - stream.WriteLong(requestId); + /// <summary> + /// Performs a send-receive operation asynchronously. + /// </summary> + public Task<T> DoOutInOpAsync<T>(ClientOp opId, Action<IBinaryStream> writeAction, + Func<IBinaryStream, T> readFunc, Func<ClientStatusCode, string, T> errorFunc = null) + { + // Encode. + var reqMsg = WriteMessage(writeAction, opId); + + // Send. + var task = SendRequestAsync(ref reqMsg); + + // Decode. + return task.ContinueWith(responseTask => DecodeResponse(responseTask.Result, readFunc, errorFunc)); + } - if (writeAction != null) + /// <summary> + /// Starts waiting for the new message. + /// </summary> + [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")] + private void WaitForMessages() + { + try + { + // Null exception means active socket. + while (_exception == null) { - writeAction(stream); + // Do not call Receive if there are no async requests pending. + while (_requests.IsEmpty) + { + // Wait with a timeout so we check for disposed state periodically. + _listenerEvent.Wait(1000); + + if (_exception != null) + { + return; + } + + _listenerEvent.Reset(); + } + + var msg = ReceiveMessage(); + HandleResponse(msg); } - }); + } + catch (Exception ex) + { + // Socket failure (connection dropped, etc). + // Close socket and all pending requests. + // Note that this does not include request decoding exceptions (failed request, invalid data, etc). + _exception = ex; + Dispose(); + } + } + + /// <summary> + /// Handles the response. + /// </summary> + private void HandleResponse(byte[] response) + { + var stream = new BinaryHeapStream(response); + var requestId = stream.ReadLong(); - using (var stream = new BinaryHeapStream(resBytes)) + Request req; + if (!_requests.TryRemove(requestId, out req)) { - var resRequestId = stream.ReadLong(); - Debug.Assert(requestId == resRequestId); + // Response with unknown id. + throw new IgniteClientException("Invalid thin client response id: " + requestId); + } - var statusCode = (ClientStatusCode) stream.ReadInt(); + req.CompletionSource.TrySetResult(stream); + } - if (statusCode == ClientStatusCode.Success) - { - return readFunc != null ? readFunc(stream) : default(T); - } + /// <summary> + /// Decodes the response that we got from <see cref="HandleResponse"/>. + /// </summary> + private static T DecodeResponse<T>(BinaryHeapStream stream, Func<IBinaryStream, T> readFunc, + Func<ClientStatusCode, string, T> errorFunc) + { + var statusCode = (ClientStatusCode)stream.ReadInt(); - var msg = BinaryUtils.Marshaller.StartUnmarshal(stream).ReadString(); + if (statusCode == ClientStatusCode.Success) + { + return readFunc != null ? readFunc(stream) : default(T); + } - if (errorFunc != null) - { - return errorFunc(statusCode, msg); - } + var msg = BinaryUtils.Marshaller.StartUnmarshal(stream).ReadString(); - throw new IgniteClientException(msg, null, statusCode); + if (errorFunc != null) + { + return errorFunc(statusCode, msg); } + + throw new IgniteClientException(msg, null, statusCode); } /// <summary> /// Performs client protocol handshake. /// </summary> - private static void Handshake(Socket sock, ClientProtocolVersion version) + private void Handshake(ClientProtocolVersion version) { - var res = SendReceive(sock, stream => + // Send request. + int messageLen; + var buf = WriteMessage(stream => { // Handshake. stream.WriteByte(OpHandshake); @@ -123,7 +238,15 @@ namespace Apache.Ignite.Core.Impl.Client // Client type: platform. stream.WriteByte(ClientType); - }, 20); + }, 12, out messageLen); + + Debug.Assert(messageLen == 12); + + var sent = _socket.Send(buf, messageLen, SocketFlags.None); + Debug.Assert(sent == messageLen); + + // Decode response. + var res = ReceiveMessage(); using (var stream = new BinaryHeapStream(res)) { @@ -140,43 +263,119 @@ namespace Apache.Ignite.Core.Impl.Client var errMsg = BinaryUtils.Marshaller.Unmarshal<string>(stream); throw new IgniteClientException(string.Format( - "Client handhsake failed: '{0}'. Client version: {1}. Server version: {2}", + "Client handshake failed: '{0}'. Client version: {1}. Server version: {2}", errMsg, version, serverVersion)); } } /// <summary> - /// Sends the request and receives a response. + /// Receives a message from socket. /// </summary> - private static byte[] SendReceive(Socket sock, Action<IBinaryStream> writeAction, int bufSize = 128) + private byte[] ReceiveMessage() { - int messageLen; - var buf = WriteMessage(writeAction, bufSize, out messageLen); + var size = GetInt(ReceiveBytes(4)); + var msg = ReceiveBytes(size); + return msg; + } - lock (sock) + /// <summary> + /// Receives the data filling provided buffer entirely. + /// </summary> + private byte[] ReceiveBytes(int size) + { + Debug.Assert(size > 0); + + // Socket.Receive can return any number of bytes, even 1. + // We should repeat Receive calls until required amount of data has been received. + var buf = new byte[size]; + var received = _socket.Receive(buf); + + while (received < size) { - var sent = sock.Send(buf, messageLen, SocketFlags.None); - Debug.Assert(sent == messageLen); + var res = _socket.Receive(buf, received, size - received, SocketFlags.None); + + if (res == 0) + { + // Disconnected. + _exception = _exception ?? new SocketException((int) SocketError.ConnectionAborted); + Dispose(); + CheckException(); + } + + received += res; + } - buf = new byte[4]; - var received = sock.Receive(buf); - Debug.Assert(received == buf.Length); + return buf; + } - using (var stream = new BinaryHeapStream(buf)) + /// <summary> + /// Sends the request synchronously. + /// </summary> + private BinaryHeapStream SendRequest(ref RequestMessage reqMsg) + { + // Do not enter lock when disposed. + CheckException(); + + // If there are no pending async requests, we can execute this operation synchronously, + // which is more efficient. + if (_sendRequestLock.TryEnterWriteLock(0)) + { + try { - var size = stream.ReadInt(); - - buf = new byte[size]; - received = sock.Receive(buf); + CheckException(); - while (received < size) + if (_requests.IsEmpty) { - received += sock.Receive(buf, received, size - received, SocketFlags.None); - } + _socket.Send(reqMsg.Buffer, 0, reqMsg.Length, SocketFlags.None); - return buf; + var respMsg = ReceiveMessage(); + var response = new BinaryHeapStream(respMsg); + var responseId = response.ReadLong(); + Debug.Assert(responseId == reqMsg.Id); + + return response; + } + } + finally + { + if (_sendRequestLock.IsWriteLockHeld) + { + _sendRequestLock.ExitWriteLock(); + } } } + + // Fallback to async mechanism. + return SendRequestAsync(ref reqMsg).Result; + } + + /// <summary> + /// Sends the request asynchronously and returns a task for corresponding response. + /// </summary> + private Task<BinaryHeapStream> SendRequestAsync(ref RequestMessage reqMsg) + { + // Do not enter lock when disposed. + CheckException(); + + _sendRequestLock.EnterReadLock(); + try + { + CheckException(); + + // Register. + var req = new Request(); + var added = _requests.TryAdd(reqMsg.Id, req); + Debug.Assert(added); + + // Send. + _socket.Send(reqMsg.Buffer, 0, reqMsg.Length, SocketFlags.None); + _listenerEvent.Set(); + return req.CompletionSource.Task; + } + finally + { + _sendRequestLock.ExitReadLock(); + } } /// <summary> @@ -184,18 +383,31 @@ namespace Apache.Ignite.Core.Impl.Client /// </summary> private static byte[] WriteMessage(Action<IBinaryStream> writeAction, int bufSize, out int messageLen) { - using (var stream = new BinaryHeapStream(bufSize)) - { - stream.WriteInt(0); // Reserve message size. + var stream = new BinaryHeapStream(bufSize); - writeAction(stream); + stream.WriteInt(0); // Reserve message size. + writeAction(stream); + stream.WriteInt(0, stream.Position - 4); // Write message size. - stream.WriteInt(0, stream.Position - 4); // Write message size. + messageLen = stream.Position; + return stream.GetArray(); + } + + /// <summary> + /// Writes the message to a byte array. + /// </summary> + private RequestMessage WriteMessage(Action<IBinaryStream> writeAction, ClientOp opId) + { + var requestId = Interlocked.Increment(ref _requestId); + var stream = new BinaryHeapStream(256); - messageLen = stream.Position; + stream.WriteInt(0); // Reserve message size. + stream.WriteShort((short) opId); + stream.WriteLong(requestId); + writeAction(stream); + stream.WriteInt(0, stream.Position - 4); // Write message size. - return stream.GetArray(); - } + return new RequestMessage(requestId, stream.GetArray(), stream.Position); } /// <summary> @@ -213,7 +425,10 @@ namespace Apache.Ignite.Core.Impl.Client { var socket = new Socket(ipEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp) { - NoDelay = cfg.TcpNoDelay + NoDelay = cfg.TcpNoDelay, + Blocking = true, + SendTimeout = (int) cfg.SocketTimeout.TotalMilliseconds, + ReceiveTimeout = (int) cfg.SocketTimeout.TotalMilliseconds }; if (cfg.SocketSendBufferSize != IgniteClientConfiguration.DefaultSocketBufferSize) @@ -274,13 +489,181 @@ namespace Apache.Ignite.Core.Impl.Client } /// <summary> + /// Checks if any of the current requests timed out. + /// </summary> + private void CheckTimeouts(object _) + { + if (_checkingTimeouts) + { + return; + } + + _checkingTimeouts = true; + + try + { + if (_exception != null) + { + _timeoutCheckTimer.Dispose(); + } + + foreach (var pair in _requests) + { + var req = pair.Value; + + if (req.Duration > _timeout) + { + Console.WriteLine(req.Duration); + req.CompletionSource.TrySetException(new SocketException((int)SocketError.TimedOut)); + + _requests.TryRemove(pair.Key, out req); + } + } + } + finally + { + _checkingTimeouts = false; + } + } + + /// <summary> + /// Gets the int from buffer. + /// </summary> + private static unsafe int GetInt(byte[] buf) + { + fixed (byte* b = buf) + { + return BinaryHeapStream.ReadInt0(b); + } + } + + /// <summary> + /// Checks the exception. + /// </summary> + private void CheckException() + { + var ex = _exception; + + if (ex != null) + { + throw ex; + } + } + + /// <summary> + /// Closes the socket and completes all pending requests with an error. + /// </summary> + private void EndRequestsWithError() + { + var ex = _exception; + Debug.Assert(ex != null); + + while (!_requests.IsEmpty) + { + foreach (var reqId in _requests.Keys.ToArray()) + { + Request req; + if (_requests.TryRemove(reqId, out req)) + { + req.CompletionSource.TrySetException(ex); + } + } + } + } + + /// <summary> /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// </summary> [SuppressMessage("Microsoft.Usage", "CA1816:CallGCSuppressFinalizeCorrectly", Justification = "There is no finalizer.")] public void Dispose() { - _socket.Dispose(); + lock (_disposeSyncRoot) + { + if (_isDisposed) + { + return; + } + + _exception = _exception ?? new ObjectDisposedException(typeof(ClientSocket).FullName); + EndRequestsWithError(); + _socket.Dispose(); + _listenerEvent.Set(); + _listenerEvent.Dispose(); + _timeoutCheckTimer.Dispose(); + + // Wait for lock to be released and dispose. + if (!_sendRequestLock.IsWriteLockHeld) + { + _sendRequestLock.EnterWriteLock(); + } + _sendRequestLock.ExitWriteLock(); + _sendRequestLock.Dispose(); + + _isDisposed = true; + } + } + + /// <summary> + /// Represents a request. + /// </summary> + private class Request + { + /** */ + private readonly TaskCompletionSource<BinaryHeapStream> _completionSource; + + /** */ + private readonly DateTime _startTime; + + /// <summary> + /// Initializes a new instance of the <see cref="Request"/> class. + /// </summary> + public Request() + { + _completionSource = new TaskCompletionSource<BinaryHeapStream>(); + _startTime = DateTime.Now; + } + + /// <summary> + /// Gets the completion source. + /// </summary> + public TaskCompletionSource<BinaryHeapStream> CompletionSource + { + get { return _completionSource; } + } + + /// <summary> + /// Gets the duration. + /// </summary> + public TimeSpan Duration + { + get { return DateTime.Now - _startTime; } + } + } + + /// <summary> + /// Represents a request message. + /// </summary> + private struct RequestMessage + { + /** */ + public readonly long Id; + + /** */ + public readonly byte[] Buffer; + + /** */ + public readonly int Length; + + /// <summary> + /// Initializes a new instance of the <see cref="RequestMessage"/> struct. + /// </summary> + public RequestMessage(long id, byte[] buffer, int length) + { + Id = id; + Length = length; + Buffer = buffer; + } } } }
