Repository: ignite Updated Branches: refs/heads/master 1308927a6 -> eab8334bb
IGNITE-4102 .NET: Generify ICacheStore This closes #1670 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eab8334b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eab8334b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eab8334b Branch: refs/heads/master Commit: eab8334bb49ceda249e742246d26f72539f9fa4c Parents: 1308927 Author: Pavel Tupitsyn <[email protected]> Authored: Mon Mar 27 16:02:47 2017 +0300 Committer: Pavel Tupitsyn <[email protected]> Committed: Mon Mar 27 16:02:47 2017 +0300 ---------------------------------------------------------------------- .../dotnet/PlatformDotNetCacheStore.java | 12 +- .../Cache/CacheConfigurationTest.cs | 2 +- .../Cache/Store/CacheStoreAdapterTest.cs | 14 +- .../Cache/Store/CacheStoreSessionTest.cs | 2 +- .../Cache/Store/CacheTestParallelLoadStore.cs | 16 +- .../Cache/Store/CacheTestStore.cs | 13 +- .../Apache.Ignite.Core.csproj | 4 +- .../Cache/Configuration/CacheConfiguration.cs | 2 +- .../dotnet/Apache.Ignite.Core/Cache/ICache.cs | 12 +- .../Store/CacheParallelLoadStoreAdapter.cs | 38 ++- .../Cache/Store/CacheStoreAdapter.cs | 30 +- .../Cache/Store/ICacheStore.cs | 39 ++- .../Cache/Store/ICacheStoreSession.cs | 2 +- .../Datastream/IDataStreamer.cs | 2 +- .../Impl/Cache/Store/CacheStore.cs | 233 ++++----------- .../Impl/Cache/Store/CacheStoreInternal.cs | 285 +++++++++++++++++++ .../Impl/Cache/Store/ICacheStoreInternal.cs | 43 +++ .../Datagrid/StoreExample.cs | 1 + .../Datagrid/EmployeeStore.cs | 27 +- 19 files changed, 497 insertions(+), 280 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java index c2f6001..dd61a54 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java @@ -201,7 +201,11 @@ public class PlatformDotNetCacheStore<K, V> implements CacheStore<K, V>, Platfor writer.writeByte(OP_LOAD_ALL); writer.writeLong(session()); writer.writeString(ses.cacheName()); - writer.writeCollection(keys0); + + writer.writeInt(keys0.size()); + + for (Object o : keys0) + writer.writeObject(o); } }, new IgniteInClosureX<BinaryRawReaderEx>() { @Override public void applyx(BinaryRawReaderEx reader) { @@ -311,7 +315,11 @@ public class PlatformDotNetCacheStore<K, V> implements CacheStore<K, V>, Platfor writer.writeByte(OP_RMV_ALL); writer.writeLong(session()); writer.writeString(ses.cacheName()); - writer.writeCollection(keys); + + writer.writeInt(keys.size()); + + for (Object o : keys) + writer.writeObject(o); } }, null); } http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheConfigurationTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheConfigurationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheConfigurationTest.cs index 02c0fc3..7a30780 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheConfigurationTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheConfigurationTest.cs @@ -703,7 +703,7 @@ namespace Apache.Ignite.Core.Tests.Cache /// <summary> /// Test store. /// </summary> - private class CacheStoreTest : CacheStoreAdapter + private class CacheStoreTest : CacheStoreAdapter<object, object> { /** <inheritdoc /> */ public override object Load(object key) http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreAdapterTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreAdapterTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreAdapterTest.cs index 6690584..02da750 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreAdapterTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreAdapterTest.cs @@ -23,7 +23,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Store using NUnit.Framework; /// <summary> - /// Tests for <see cref="CacheStoreAdapter"/>. + /// Tests for <see cref="CacheStoreAdapter{K, V}"/>. /// </summary> public class CacheStoreAdapterTest { @@ -62,26 +62,26 @@ namespace Apache.Ignite.Core.Tests.Cache.Store /// <summary> /// Test store. /// </summary> - private class Store : CacheStoreAdapter + private class Store : CacheStoreAdapter<int, string> { /** */ - public readonly Dictionary<object, object> Map = new Dictionary<object, object>(); + public readonly Dictionary<int, string> Map = new Dictionary<int, string>(); /** <inheritdoc /> */ - public override object Load(object key) + public override string Load(int key) { - object res; + string res; return Map.TryGetValue(key, out res) ? res : null; } /** <inheritdoc /> */ - public override void Write(object key, object val) + public override void Write(int key, string val) { Map[key] = val; } /** <inheritdoc /> */ - public override void Delete(object key) + public override void Delete(int key) { Map.Remove(key); } http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs index d01726a..6f9d791 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs @@ -155,7 +155,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Store /// Test store implementation. /// </summary> // ReSharper disable once UnusedMember.Global - public class Store : CacheStoreAdapter + public class Store : CacheStoreAdapter<object, object> { /** Store session. */ [StoreSessionResource] http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestParallelLoadStore.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestParallelLoadStore.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestParallelLoadStore.cs index 81b4697..4786032 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestParallelLoadStore.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestParallelLoadStore.cs @@ -17,8 +17,6 @@ namespace Apache.Ignite.Core.Tests.Cache.Store { - using System; - using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; @@ -28,7 +26,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Store /// <summary> /// Test cache store with parallel load. /// </summary> - public class CacheTestParallelLoadStore : CacheParallelLoadStoreAdapter + public class CacheTestParallelLoadStore : + CacheParallelLoadStoreAdapter<object, object, CacheTestParallelLoadStore.Record> { /** Length of input data sequence */ public const int InputDataLength = 10000; @@ -61,23 +60,21 @@ namespace Apache.Ignite.Core.Tests.Cache.Store } /** <inheritdoc /> */ - protected override IEnumerable GetInputData() + protected override IEnumerable<Record> GetInputData() { return Enumerable.Range(0, InputDataLength).Select(x => new Record {Id = x, Name = "Test Record " + x}); } /** <inheritdoc /> */ - protected override KeyValuePair<object, object>? Parse(object inputRecord, params object[] args) + protected override KeyValuePair<object, object>? Parse(Record inputRecord, params object[] args) { var threadId = Thread.CurrentThread.ManagedThreadId; ThreadIds.GetOrAdd(threadId, threadId); var minId = (int)args[0]; - var rec = (Record)inputRecord; - - return rec.Id >= minId - ? new KeyValuePair<object, object>(rec.Id, rec) + return inputRecord.Id >= minId + ? new KeyValuePair<object, object>(inputRecord.Id, inputRecord) : (KeyValuePair<object, object>?) null; } @@ -94,6 +91,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Store /// <summary> /// Gets or sets the name. /// </summary> + // ReSharper disable once UnusedAutoPropertyAccessor.Global public string Name { get; set; } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs index f80f5ce..36b190f 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs @@ -20,6 +20,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Store using System; using System.Collections; using System.Collections.Concurrent; + using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Linq; @@ -29,7 +30,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Store using Apache.Ignite.Core.Resource; [SuppressMessage("ReSharper", "FieldCanBeMadeReadOnly.Local")] - public class CacheTestStore : ICacheStore + public class CacheTestStore : ICacheStore<object, object> { public static readonly IDictionary Map = new ConcurrentDictionary<object, object>(); @@ -115,13 +116,13 @@ namespace Apache.Ignite.Core.Tests.Cache.Store return Map[key]; } - public IDictionary LoadAll(ICollection keys) + public IEnumerable<KeyValuePair<object, object>> LoadAll(IEnumerable<object> keys) { ThrowIfNeeded(); Debug.Assert(_grid != null); - return keys.OfType<object>().ToDictionary(key => key, key => "val_" + key); + return keys.ToDictionary(key => key, key =>(object)( "val_" + key)); } public void Write(object key, object val) @@ -133,13 +134,13 @@ namespace Apache.Ignite.Core.Tests.Cache.Store Map[key] = val; } - public void WriteAll(IDictionary map) + public void WriteAll(IEnumerable<KeyValuePair<object, object>> map) { ThrowIfNeeded(); Debug.Assert(_grid != null); - foreach (DictionaryEntry e in map) + foreach (var e in map) Map[e.Key] = e.Value; } @@ -152,7 +153,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Store Map.Remove(key); } - public void DeleteAll(ICollection keys) + public void DeleteAll(IEnumerable<object> keys) { ThrowIfNeeded(); http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj index 58002db..eab0bb5 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj @@ -191,6 +191,8 @@ <Compile Include="Impl\Binary\SerializableSerializer.cs" /> <Compile Include="Impl\Binary\BinaryWriterExtensions.cs" /> <Compile Include="Impl\Cache\Affinity\AffinityFunctionBase.cs" /> + <Compile Include="Impl\Cache\Store\CacheStore.cs" /> + <Compile Include="Impl\Cache\Store\ICacheStoreInternal.cs" /> <Compile Include="Impl\Transactions\CacheTransactionManager.cs" /> <Compile Include="Impl\Cache\Expiry\ExpiryPolicyFactory.cs" /> <Compile Include="Impl\Cache\Expiry\ExpiryPolicySerializer.cs" /> @@ -299,7 +301,7 @@ <Compile Include="Impl\Cache\Query\Continuous\ContinuousQueryUtils.cs" /> <Compile Include="Impl\Cache\Query\FieldsQueryCursor.cs" /> <Compile Include="Impl\Cache\Query\QueryCursor.cs" /> - <Compile Include="Impl\Cache\Store\CacheStore.cs" /> + <Compile Include="Impl\Cache\Store\CacheStoreInternal.cs" /> <Compile Include="Impl\Cache\Store\CacheStoreSession.cs" /> <Compile Include="Impl\Cache\Store\CacheStoreSessionProxy.cs" /> <Compile Include="Impl\Cluster\ClusterGroupImpl.cs" /> http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/CacheConfiguration.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/CacheConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/CacheConfiguration.cs index ebf412d..29d2ee3 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/CacheConfiguration.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/CacheConfiguration.cs @@ -602,7 +602,7 @@ namespace Apache.Ignite.Core.Cache.Configuration /// <summary> /// Maximum batch size for write-behind cache store operations. /// Store operations (get or remove) are combined in a batch of this size to be passed to - /// <see cref="ICacheStore.WriteAll"/> or <see cref="ICacheStore.DeleteAll"/> methods. + /// <see cref="ICacheStore{K, V}.WriteAll"/> or <see cref="ICacheStore{K, V}.DeleteAll"/> methods. /// </summary> [DefaultValue(DefaultWriteBehindBatchSize)] public int WriteBehindBatchSize { get; set; } http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs index 50938e1..77e47c7 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs @@ -118,7 +118,7 @@ namespace Apache.Ignite.Core.Cache /// Optional predicate. If provided, will be used to filter values to be put into cache. /// </param> /// <param name="args"> - /// Optional user arguments to be passed into <see cref="ICacheStore.LoadCache" />. + /// Optional user arguments to be passed into <see cref="ICacheStore{K, V}.LoadCache" />. /// </param> void LoadCache(ICacheEntryFilter<TK, TV> p, params object[] args); @@ -129,12 +129,12 @@ namespace Apache.Ignite.Core.Cache /// Optional predicate. If provided, will be used to filter values to be put into cache. /// </param> /// <param name="args"> - /// Optional user arguments to be passed into <see cref="ICacheStore.LoadCache" />. + /// Optional user arguments to be passed into <see cref="ICacheStore{K, V}.LoadCache" />. /// </param> Task LoadCacheAsync(ICacheEntryFilter<TK, TV> p, params object[] args); /// <summary> - /// Delegates to <see cref="ICacheStore.LoadCache" /> method to load state + /// Delegates to <see cref="ICacheStore{K, V}.LoadCache" /> method to load state /// from the underlying persistent storage. The loaded values will then be given /// to the optionally passed in predicate, and, if the predicate returns true, /// will be stored in cache. If predicate is null, then all loaded values will be stored in cache. @@ -143,12 +143,12 @@ namespace Apache.Ignite.Core.Cache /// Optional predicate. If provided, will be used to filter values to be put into cache. /// </param> /// <param name="args"> - /// Optional user arguments to be passed into <see cref="ICacheStore.LoadCache" />. + /// Optional user arguments to be passed into <see cref="ICacheStore{K, V}.LoadCache" />. /// </param> void LocalLoadCache(ICacheEntryFilter<TK, TV> p, params object[] args); /// <summary> - /// Delegates to <see cref="ICacheStore.LoadCache" /> method to load state + /// Delegates to <see cref="ICacheStore{K, V}.LoadCache" /> method to load state /// from the underlying persistent storage. The loaded values will then be given /// to the optionally passed in predicate, and, if the predicate returns true, /// will be stored in cache. If predicate is null, then all loaded values will be stored in cache. @@ -157,7 +157,7 @@ namespace Apache.Ignite.Core.Cache /// Optional predicate. If provided, will be used to filter values to be put into cache. /// </param> /// <param name="args"> - /// Optional user arguments to be passed into <see cref="ICacheStore.LoadCache" />. + /// Optional user arguments to be passed into <see cref="ICacheStore{K, V}.LoadCache" />. /// </param> Task LocalLoadCacheAsync(ICacheEntryFilter<TK, TV> p, params object[] args); http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/CacheParallelLoadStoreAdapter.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/CacheParallelLoadStoreAdapter.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/CacheParallelLoadStoreAdapter.cs index c506838..467b246 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/CacheParallelLoadStoreAdapter.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/CacheParallelLoadStoreAdapter.cs @@ -18,10 +18,8 @@ namespace Apache.Ignite.Core.Cache.Store { using System; - using System.Collections; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; - using System.Linq; using System.Threading.Tasks; /// <summary> @@ -32,19 +30,17 @@ namespace Apache.Ignite.Core.Cache.Store /// GetInputData().GetEnumerator() result will be disposed if it implements IDisposable. /// Any additional post-LoadCache steps can be performed by overriding LoadCache method. /// </remarks> - public abstract class CacheParallelLoadStoreAdapter : ICacheStore + /// <typeparam name="TK">Key type.</typeparam> + /// <typeparam name="TV">Value type.</typeparam> + /// <typeparam name="TData">Custom data entry type.</typeparam> + public abstract class CacheParallelLoadStoreAdapter<TK, TV, TData> : ICacheStore<TK, TV> { /// <summary> - /// Default number of working threads (equal to the number of available processors). - /// </summary> - public static readonly int DefaultThreadsCount = Environment.ProcessorCount; - - /// <summary> /// Constructor. /// </summary> protected CacheParallelLoadStoreAdapter() { - MaxDegreeOfParallelism = DefaultThreadsCount; + MaxDegreeOfParallelism = Environment.ProcessorCount; } /// <summary> @@ -62,7 +58,7 @@ namespace Apache.Ignite.Core.Cache.Store /// <param name="act">Action for loaded values.</param> /// <param name="args">Optional arguemnts passed to <see cref="ICache{K,V}.LocalLoadCache" /> method.</param> /// <exception cref="CacheStoreException" /> - public virtual void LoadCache(Action<object, object> act, params object[] args) + public virtual void LoadCache(Action<TK, TV> act, params object[] args) { if (MaxDegreeOfParallelism == 0 || MaxDegreeOfParallelism < -1) throw new ArgumentOutOfRangeException("MaxDegreeOfParallelism must be either positive or -1: " + @@ -70,7 +66,7 @@ namespace Apache.Ignite.Core.Cache.Store var options = new ParallelOptions {MaxDegreeOfParallelism = MaxDegreeOfParallelism}; - Parallel.ForEach(GetInputData().OfType<object>(), options, item => + Parallel.ForEach(GetInputData(), options, item => { var cacheEntry = Parse(item, args); @@ -83,19 +79,19 @@ namespace Apache.Ignite.Core.Cache.Store /// Gets the input data sequence to be used in LoadCache. /// </summary> [SuppressMessage("Microsoft.Design", "CA1024:UsePropertiesWhereAppropriate", Justification = "Semantics.")] - protected abstract IEnumerable GetInputData(); + protected abstract IEnumerable<TData> GetInputData(); /// <summary> /// This method should transform raw data records from GetInputData /// into valid key-value pairs to be stored into cache. /// </summary> - protected abstract KeyValuePair<object, object>? Parse(object inputRecord, params object[] args); + protected abstract KeyValuePair<TK, TV>? Parse(TData inputRecord, params object[] args); /// <summary> /// Gets or sets the maximum degree of parallelism to use in LoadCache. /// Must be either positive or -1 for unlimited amount of threads. /// <para /> - /// Defaults to <see cref="DefaultThreadsCount"/>. + /// Defaults to <see cref="Environment.ProcessorCount"/>. /// </summary> public int MaxDegreeOfParallelism { get; set; } @@ -111,9 +107,9 @@ namespace Apache.Ignite.Core.Cache.Store /// or <c>null</c> if the object can't be loaded /// </returns> [ExcludeFromCodeCoverage] - public virtual object Load(object key) + public virtual TV Load(TK key) { - return null; + return default(TV); } /// <summary> @@ -126,7 +122,7 @@ namespace Apache.Ignite.Core.Cache.Store /// A map of key, values to be stored in the cache. /// </returns> [ExcludeFromCodeCoverage] - public virtual IDictionary LoadAll(ICollection keys) + public virtual IEnumerable<KeyValuePair<TK, TV>> LoadAll(IEnumerable<TK> keys) { return null; } @@ -139,7 +135,7 @@ namespace Apache.Ignite.Core.Cache.Store /// <param name="key">Key to write.</param> /// <param name="val">Value to write.</param> [ExcludeFromCodeCoverage] - public virtual void Write(object key, object val) + public virtual void Write(TK key, TV val) { // No-op. } @@ -158,7 +154,7 @@ namespace Apache.Ignite.Core.Cache.Store /// to write for write-through. Upon return the collection must only contain entries /// that were not successfully written. (see partial success above).</param> [ExcludeFromCodeCoverage] - public virtual void WriteAll(IDictionary entries) + public virtual void WriteAll(IEnumerable<KeyValuePair<TK, TV>> entries) { // No-op. } @@ -172,7 +168,7 @@ namespace Apache.Ignite.Core.Cache.Store /// </summary> /// <param name="key">The key that is used for the delete operation.</param> [ExcludeFromCodeCoverage] - public virtual void Delete(object key) + public virtual void Delete(TK key) { // No-op. } @@ -195,7 +191,7 @@ namespace Apache.Ignite.Core.Cache.Store /// it contains the keys to delete for write-through. Upon return the collection must only contain /// the keys that were not successfully deleted.</param> [ExcludeFromCodeCoverage] - public virtual void DeleteAll(ICollection keys) + public virtual void DeleteAll(IEnumerable<TK> keys) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/CacheStoreAdapter.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/CacheStoreAdapter.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/CacheStoreAdapter.cs index a38678d..769c4c2 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/CacheStoreAdapter.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/CacheStoreAdapter.cs @@ -18,7 +18,7 @@ namespace Apache.Ignite.Core.Cache.Store { using System; - using System.Collections; + using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.Linq; @@ -34,7 +34,9 @@ namespace Apache.Ignite.Core.Cache.Store /// Note that <c>LoadCache</c> method has empty implementation because it is /// essentially up to the user to invoke it with specific arguments. /// </summary> - public abstract class CacheStoreAdapter : ICacheStore + /// <typeparam name="TK">Key type.</typeparam> + /// <typeparam name="TV">Value type.</typeparam> + public abstract class CacheStoreAdapter<TK, TV> : ICacheStore<TK, TV> { /// <summary> /// Loads all values from underlying persistent storage. Note that keys are @@ -50,11 +52,11 @@ namespace Apache.Ignite.Core.Cache.Store /// </summary> /// <param name="act">Action for loaded values.</param> /// <param name="args">Optional arguemnts passed to <see cref="ICache{K,V}.LocalLoadCache" /> method.</param> - public virtual void LoadCache(Action<object, object> act, params object[] args) + public virtual void LoadCache(Action<TK, TV> act, params object[] args) { // No-op. } - + /// <summary> /// Loads multiple objects. Application developers should implement this method to customize /// the loading of cache entries. This method is called when the requested object is not in the cache. @@ -64,19 +66,19 @@ namespace Apache.Ignite.Core.Cache.Store /// <returns> /// A map of key, values to be stored in the cache. /// </returns> - public virtual IDictionary LoadAll(ICollection keys) + public virtual IEnumerable<KeyValuePair<TK, TV>> LoadAll(IEnumerable<TK> keys) { - return keys.OfType<object>().ToDictionary(key => key, Load); + return keys.ToDictionary(key => key, Load); } - + /// <summary> /// Writes all. /// </summary> /// <param name="entries">The map.</param> [SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")] - public virtual void WriteAll(IDictionary entries) + public virtual void WriteAll(IEnumerable<KeyValuePair<TK, TV>> entries) { - foreach (DictionaryEntry entry in entries) + foreach (var entry in entries) Write(entry.Key, entry.Value); } @@ -98,9 +100,9 @@ namespace Apache.Ignite.Core.Cache.Store /// it contains the keys to delete for write-through. Upon return the collection must only contain /// the keys that were not successfully deleted.</param> [SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")] - public virtual void DeleteAll(ICollection keys) + public virtual void DeleteAll(IEnumerable<TK> keys) { - foreach (object key in keys) + foreach (var key in keys) Delete(key); } @@ -125,7 +127,7 @@ namespace Apache.Ignite.Core.Cache.Store /// The value for the entry that is to be stored in the cache /// or <c>null</c> if the object can't be loaded /// </returns> - public abstract object Load(object key); + public abstract TV Load(TK key); /// <summary> /// Write the specified value under the specified key to the external resource. @@ -134,7 +136,7 @@ namespace Apache.Ignite.Core.Cache.Store /// </summary> /// <param name="key">Key to write.</param> /// <param name="val">Value to write.</param> - public abstract void Write(object key, object val); + public abstract void Write(TK key, TV val); /// <summary> /// Delete the cache entry from the external resource. @@ -144,6 +146,6 @@ namespace Apache.Ignite.Core.Cache.Store /// This method is invoked even if no mapping for the key exists. /// </summary> /// <param name="key">The key that is used for the delete operation.</param> - public abstract void Delete(object key); + public abstract void Delete(TK key); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/ICacheStore.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/ICacheStore.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/ICacheStore.cs index d6e4f80..044784a 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/ICacheStore.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/ICacheStore.cs @@ -18,12 +18,19 @@ namespace Apache.Ignite.Core.Cache.Store { using System; - using System.Collections; + using System.Collections.Generic; + using System.Diagnostics.CodeAnalysis; + using Apache.Ignite.Core.Binary; + using Apache.Ignite.Core.Cache.Configuration; using Apache.Ignite.Core.Transactions; /// <summary> /// API for cache persistent storage for read-through and write-through behavior. - /// + /// <para /> + /// Generic argument types depend on <see cref="CacheConfiguration.KeepBinaryInStore"/> property. + /// When <c>true</c> (default), cache store operates on <see cref="IBinaryObject"/> instances. + /// Otherwise, generic arguments should be the same as in corresponding <see cref="ICache{TK, TV}"/>. + /// <para /> /// Persistent store is configured in Ignite's Spring XML configuration file via /// <c>CacheConfiguration.setStore()</c> property. If you have an implementation /// of cache store in .NET, you should use special Java wrapper which accepts assembly name and @@ -75,7 +82,9 @@ namespace Apache.Ignite.Core.Cache.Store /// </code> /// </example> /// </summary> - public interface ICacheStore + /// <typeparam name="TK">Key type.</typeparam> + /// <typeparam name="TV">Value type.</typeparam> + public interface ICacheStore<TK, TV> : ICacheStore { /// <summary> /// Loads all values from underlying persistent storage. Note that keys are @@ -92,7 +101,7 @@ namespace Apache.Ignite.Core.Cache.Store /// <param name="act">Action for loaded values.</param> /// <param name="args">Optional arguemnts passed to <see cref="ICache{K,V}.LocalLoadCache"/> method.</param> /// <exception cref="CacheStoreException" /> - void LoadCache(Action<object, object> act, params object[] args); + void LoadCache(Action<TK, TV> act, params object[] args); /// <summary> /// Loads an object. Application developers should implement this method to customize the loading @@ -104,7 +113,7 @@ namespace Apache.Ignite.Core.Cache.Store /// <returns>The value for the entry that is to be stored in the cache /// or <c>null</c> if the object can't be loaded</returns> /// <exception cref="CacheStoreException" /> - object Load(object key); + TV Load(TK key); /// <summary> /// Loads multiple objects. Application developers should implement this method to customize @@ -114,7 +123,7 @@ namespace Apache.Ignite.Core.Cache.Store /// <param name="keys">Keys identifying the values to be loaded.</param> /// <returns>A map of key, values to be stored in the cache.</returns> /// <exception cref="CacheStoreException" /> - IDictionary LoadAll(ICollection keys); + IEnumerable<KeyValuePair<TK, TV>> LoadAll(IEnumerable<TK> keys); /// <summary> /// Write the specified value under the specified key to the external resource. @@ -124,7 +133,7 @@ namespace Apache.Ignite.Core.Cache.Store /// <param name="key">Key to write.</param> /// <param name="val">Value to write.</param> /// <exception cref="CacheStoreException" /> - void Write(object key, object val); + void Write(TK key, TV val); /// <summary> /// Write the specified entries to the external resource. @@ -140,7 +149,7 @@ namespace Apache.Ignite.Core.Cache.Store /// to write for write-through. Upon return the collection must only contain entries /// that were not successfully written. (see partial success above).</param> /// <exception cref="CacheStoreException" /> - void WriteAll(IDictionary entries); + void WriteAll(IEnumerable<KeyValuePair<TK, TV>> entries); /// <summary> /// Delete the cache entry from the external resource. @@ -151,7 +160,7 @@ namespace Apache.Ignite.Core.Cache.Store /// </summary> /// <param name="key">The key that is used for the delete operation.</param> /// <exception cref="CacheStoreException" /> - void Delete(object key); + void Delete(TK key); /// <summary> /// Remove data and keys from the external resource for the given collection of keys, if present. @@ -171,7 +180,7 @@ namespace Apache.Ignite.Core.Cache.Store /// it contains the keys to delete for write-through. Upon return the collection must only contain /// the keys that were not successfully deleted.</param> /// <exception cref="CacheStoreException" /> - void DeleteAll(ICollection keys); + void DeleteAll(IEnumerable<TK> keys); /// <summary> /// Tells store to commit or rollback a transaction depending on the value of the @@ -181,4 +190,14 @@ namespace Apache.Ignite.Core.Cache.Store /// <exception cref="CacheStoreException" /> void SessionEnd(bool commit); } + + /// <summary> + /// Non-generic base type for <see cref="ICacheStore{TK,TV}"/>, used only for configuration property. + /// Users should implement generic <see cref="ICacheStore{TK,TV}"/>. + /// </summary> + [SuppressMessage("Microsoft.Design", "CA1040:AvoidEmptyInterfaces")] + public interface ICacheStore + { + // No-op. + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/ICacheStoreSession.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/ICacheStoreSession.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/ICacheStoreSession.cs index e20a660..bd9ccdf 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/ICacheStoreSession.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Store/ICacheStoreSession.cs @@ -23,7 +23,7 @@ namespace Apache.Ignite.Core.Cache.Store /// Session for the cache store operations. The main purpose of cache store session /// is to hold context between multiple store invocations whenever in transaction. For example, /// you can save current database connection in the session <see cref="Properties"/> map. You can then - /// commit this connection in the <see cref="ICacheStore.SessionEnd(bool)"/> method. + /// commit this connection in the <see cref="ICacheStore{K,V}.SessionEnd(bool)"/> method. /// </summary> public interface ICacheStoreSession { http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs index 64c0f9e..d18040f 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs @@ -34,7 +34,7 @@ namespace Apache.Ignite.Core.Datastream /// <para /> /// Also note that <c>IDataStreamer</c> is not the only way to load data into cache. /// Alternatively you can use - /// <see cref="ICacheStore.LoadCache(Action{object, object}, object[])"/> + /// <see cref="ICacheStore{K, V}.LoadCache(Action{K, V}, object[])"/> /// method to load data from underlying data store. You can also use standard cache /// <c>put</c> and <c>putAll</c> operations as well, but they most likely will not perform /// as well as this class for loading data. And finally, data can be loaded from underlying http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStore.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStore.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStore.cs index befe72b..f728e2b 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStore.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStore.cs @@ -17,74 +17,37 @@ namespace Apache.Ignite.Core.Impl.Cache.Store { - using System.Collections; + using System; using System.Diagnostics; - using System.IO; - using Apache.Ignite.Core.Binary; + using System.Globalization; + using System.Linq; using Apache.Ignite.Core.Cache.Store; using Apache.Ignite.Core.Common; using Apache.Ignite.Core.Impl.Binary; - using Apache.Ignite.Core.Impl.Binary.IO; using Apache.Ignite.Core.Impl.Handle; - using Apache.Ignite.Core.Impl.Resource; + using Apache.Ignite.Core.Impl.Memory; /// <summary> - /// Interop cache store. + /// Interop cache store, delegates to generic <see cref="CacheStoreInternal{TK,TV}"/> wrapper. /// </summary> internal class CacheStore { - /** */ - private const byte OpLoadCache = 0; - - /** */ - private const byte OpLoad = 1; - - /** */ - private const byte OpLoadAll = 2; - - /** */ - private const byte OpPut = 3; - - /** */ - private const byte OpPutAll = 4; - - /** */ - private const byte OpRmv = 5; - - /** */ - private const byte OpRmvAll = 6; - - /** */ - private const byte OpSesEnd = 7; - - /** */ - private readonly bool _convertBinary; - /** Store. */ - private readonly ICacheStore _store; - - /** Session. */ - private readonly CacheStoreSessionProxy _sesProxy; + private readonly ICacheStoreInternal _store; /** */ private readonly long _handle; - + /// <summary> /// Initializes a new instance of the <see cref="CacheStore" /> class. /// </summary> /// <param name="store">Store.</param> - /// <param name="convertBinary">Whether to convert binary objects.</param> /// <param name="registry">The handle registry.</param> - private CacheStore(ICacheStore store, bool convertBinary, HandleRegistry registry) + private CacheStore(ICacheStoreInternal store, HandleRegistry registry) { Debug.Assert(store != null); _store = store; - _convertBinary = convertBinary; - - _sesProxy = new CacheStoreSessionProxy(); - - ResourceProcessor.InjectStoreSession(store, _sesProxy); _handle = registry.AllocateCritical(this); } @@ -97,7 +60,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Store /// <returns> /// Interop cache store. /// </returns> - internal static CacheStore CreateInstance(long memPtr, HandleRegistry registry) + public static CacheStore CreateInstance(long memPtr, HandleRegistry registry) { using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) { @@ -109,7 +72,14 @@ namespace Apache.Ignite.Core.Impl.Cache.Store ICacheStore store; if (factory != null) + { store = factory.CreateInstance(); + + if (store == null) + { + throw new IgniteException("Cache store factory should not return null: " + factory.GetType()); + } + } else { var className = reader.ReadString(); @@ -118,8 +88,13 @@ namespace Apache.Ignite.Core.Impl.Cache.Store store = IgniteUtils.CreateInstance<ICacheStore>(className, propertyMap); } + var iface = GetCacheStoreInterface(store); - return new CacheStore(store, convertBinary, registry); + var storeType = typeof(CacheStoreInternal<,>).MakeGenericType(iface.GetGenericArguments()); + + var storeInt = (ICacheStoreInternal)Activator.CreateInstance(storeType, store, convertBinary); + + return new CacheStore(storeInt, registry); } } @@ -137,7 +112,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Store /// <param name="grid">Grid.</param> public void Init(Ignite grid) { - ResourceProcessor.Inject(_store, grid); + _store.Init(grid); } /// <summary> @@ -147,148 +122,36 @@ namespace Apache.Ignite.Core.Impl.Cache.Store /// <param name="grid">Grid.</param> /// <returns>Invocation result.</returns> /// <exception cref="IgniteException">Invalid operation type: + opType</exception> - public int Invoke(IBinaryStream stream, Ignite grid) + public long Invoke(PlatformMemoryStream stream, Ignite grid) { - IBinaryReader reader = grid.Marshaller.StartUnmarshal(stream, - _convertBinary ? BinaryMode.Deserialize : BinaryMode.ForceBinary); - - IBinaryRawReader rawReader = reader.GetRawReader(); - - int opType = rawReader.ReadByte(); - - // Setup cache session for this invocation. - long sesId = rawReader.ReadLong(); - - CacheStoreSession ses = grid.HandleRegistry.Get<CacheStoreSession>(sesId, true); - - ses.CacheName = rawReader.ReadString(); - - _sesProxy.SetSession(ses); + return _store.Invoke(stream, grid); + } + + /// <summary> + /// Gets the generic <see cref="ICacheStore{TK,TV}"/> interface type. + /// </summary> + private static Type GetCacheStoreInterface(ICacheStore store) + { + var ifaces = store.GetType().GetInterfaces() + .Where(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ICacheStore<,>)) + .ToArray(); - try + if (ifaces.Length == 0) { - // Perform operation. - switch (opType) - { - case OpLoadCache: - { - var args = rawReader.ReadArray<object>(); - - stream.Seek(0, SeekOrigin.Begin); - - int cnt = 0; - stream.WriteInt(cnt); // Reserve space for count. - - var writer = grid.Marshaller.StartMarshal(stream); - - _store.LoadCache((k, v) => - { - lock (writer) // User-defined store can be multithreaded. - { - writer.WithDetach(w => - { - w.WriteObject(k); - w.WriteObject(v); - }); - - cnt++; - } - }, args); - - stream.WriteInt(0, cnt); - - grid.Marshaller.FinishMarshal(writer); - - break; - } - - case OpLoad: - { - var val = _store.Load(rawReader.ReadObject<object>()); - - stream.Seek(0, SeekOrigin.Begin); - - var writer = grid.Marshaller.StartMarshal(stream); - - writer.WriteObject(val); - - grid.Marshaller.FinishMarshal(writer); - - break; - } - - case OpLoadAll: - { - var keys = rawReader.ReadCollection(); - - var result = _store.LoadAll(keys); - - stream.Seek(0, SeekOrigin.Begin); - - stream.WriteInt(result.Count); - - var writer = grid.Marshaller.StartMarshal(stream); - - foreach (DictionaryEntry entry in result) - { - var entry0 = entry; // Copy modified closure. - - writer.WithDetach(w => - { - w.WriteObject(entry0.Key); - w.WriteObject(entry0.Value); - }); - } - - grid.Marshaller.FinishMarshal(writer); - - break; - } - - case OpPut: - _store.Write(rawReader.ReadObject<object>(), rawReader.ReadObject<object>()); - - break; - - case OpPutAll: - var size = rawReader.ReadInt(); - - var dict = new Hashtable(size); - - for (int i = 0; i < size; i++) - dict[rawReader.ReadObject<object>()] = rawReader.ReadObject<object>(); - - _store.WriteAll(dict); - - break; - - case OpRmv: - _store.Delete(rawReader.ReadObject<object>()); - - break; - - case OpRmvAll: - _store.DeleteAll(rawReader.ReadCollection()); - - break; - - case OpSesEnd: - grid.HandleRegistry.Release(sesId); - - _store.SessionEnd(rawReader.ReadBoolean()); - - break; - - default: - throw new IgniteException("Invalid operation type: " + opType); - } - - return 0; + throw new IgniteException(string.Format( + CultureInfo.InvariantCulture, "Cache store should implement generic {0} interface: {1}", + typeof(ICacheStore<,>), store.GetType())); } - finally + + if (ifaces.Length > 1) { - _sesProxy.ClearSession(); + throw new IgniteException(string.Format( + CultureInfo.InvariantCulture, "Cache store should not implement generic {0} " + + "interface more than once: {1}", + typeof(ICacheStore<,>), store.GetType())); } + + return ifaces[0]; } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreInternal.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreInternal.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreInternal.cs new file mode 100644 index 0000000..f147579 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreInternal.cs @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache.Store +{ + using System.Collections.Generic; + using System.Diagnostics; + using System.IO; + using Apache.Ignite.Core.Binary; + using Apache.Ignite.Core.Cache.Store; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Impl.Binary; + using Apache.Ignite.Core.Impl.Binary.IO; + using Apache.Ignite.Core.Impl.Resource; + + /// <summary> + /// Generic cache store wrapper. + /// </summary> + internal class CacheStoreInternal<TK, TV> : ICacheStoreInternal + { + /** */ + private const byte OpLoadCache = 0; + + /** */ + private const byte OpLoad = 1; + + /** */ + private const byte OpLoadAll = 2; + + /** */ + private const byte OpPut = 3; + + /** */ + private const byte OpPutAll = 4; + + /** */ + private const byte OpRmv = 5; + + /** */ + private const byte OpRmvAll = 6; + + /** */ + private const byte OpSesEnd = 7; + + /** */ + private readonly bool _convertBinary; + + /** User store. */ + private readonly ICacheStore<TK, TV> _store; + + /** Session. */ + private readonly CacheStoreSessionProxy _sesProxy; + + /// <summary> + /// Initializes a new instance of the <see cref="CacheStoreInternal{TK,TV}"/> class. + /// </summary> + public CacheStoreInternal(ICacheStore<TK, TV> store, bool convertBinary) + { + Debug.Assert(store != null); + + _store = store; + + _convertBinary = convertBinary; + + _sesProxy = new CacheStoreSessionProxy(); + + ResourceProcessor.InjectStoreSession(store, _sesProxy); + } + + /// <summary> + /// Initializes this instance with a grid. + /// </summary> + /// <param name="grid">Grid.</param> + public void Init(Ignite grid) + { + ResourceProcessor.Inject(_store, grid); + } + + /// <summary> + /// Invokes a store operation. + /// </summary> + /// <param name="stream">Input stream.</param> + /// <param name="grid">Grid.</param> + /// <returns>Invocation result.</returns> + /// <exception cref="IgniteException">Invalid operation type: + opType</exception> + public int Invoke(IBinaryStream stream, Ignite grid) + { + IBinaryReader reader = grid.Marshaller.StartUnmarshal(stream, + _convertBinary ? BinaryMode.Deserialize : BinaryMode.ForceBinary); + + IBinaryRawReader rawReader = reader.GetRawReader(); + + int opType = rawReader.ReadByte(); + + // Setup cache session for this invocation. + long sesId = rawReader.ReadLong(); + + CacheStoreSession ses = grid.HandleRegistry.Get<CacheStoreSession>(sesId, true); + + ses.CacheName = rawReader.ReadString(); + + _sesProxy.SetSession(ses); + + try + { + // Perform operation. + switch (opType) + { + case OpLoadCache: + { + var args = rawReader.ReadArray<object>(); + + stream.Seek(0, SeekOrigin.Begin); + + int cnt = 0; + stream.WriteInt(cnt); // Reserve space for count. + + var writer = grid.Marshaller.StartMarshal(stream); + + _store.LoadCache((k, v) => + { + lock (writer) // User-defined store can be multithreaded. + { + writer.WithDetach(w => + { + w.WriteObject(k); + w.WriteObject(v); + }); + + cnt++; + } + }, args); + + stream.WriteInt(0, cnt); + + grid.Marshaller.FinishMarshal(writer); + + break; + } + + case OpLoad: + { + var val = _store.Load(rawReader.ReadObject<TK>()); + + stream.Seek(0, SeekOrigin.Begin); + + var writer = grid.Marshaller.StartMarshal(stream); + + writer.WriteObject(val); + + grid.Marshaller.FinishMarshal(writer); + + break; + } + + case OpLoadAll: + { + // We can't do both read and write lazily because stream is reused. + // Read keys non-lazily, write result lazily. + var keys = ReadAllKeys(rawReader); + + var result = _store.LoadAll(keys); + + stream.Seek(0, SeekOrigin.Begin); + + int cnt = 0; + stream.WriteInt(cnt); // Reserve space for count. + + var writer = grid.Marshaller.StartMarshal(stream); + + foreach (var entry in result) + { + var entry0 = entry; // Copy modified closure. + + writer.WithDetach(w => + { + w.WriteObject(entry0.Key); + w.WriteObject(entry0.Value); + }); + + cnt++; + } + + stream.WriteInt(0, cnt); + + grid.Marshaller.FinishMarshal(writer); + + break; + } + + case OpPut: + _store.Write(rawReader.ReadObject<TK>(), rawReader.ReadObject<TV>()); + + break; + + case OpPutAll: + _store.WriteAll(ReadPairs(rawReader)); + + break; + + case OpRmv: + _store.Delete(rawReader.ReadObject<TK>()); + + break; + + case OpRmvAll: + _store.DeleteAll(ReadKeys(rawReader)); + + break; + + case OpSesEnd: + grid.HandleRegistry.Release(sesId); + + _store.SessionEnd(rawReader.ReadBoolean()); + + break; + + default: + throw new IgniteException("Invalid operation type: " + opType); + } + + return 0; + } + finally + { + _sesProxy.ClearSession(); + } + } + + /// <summary> + /// Reads key-value pairs. + /// </summary> + private static IEnumerable<KeyValuePair<TK, TV>> ReadPairs(IBinaryRawReader rawReader) + { + var size = rawReader.ReadInt(); + + for (var i = 0; i < size; i++) + { + yield return new KeyValuePair<TK, TV>(rawReader.ReadObject<TK>(), rawReader.ReadObject<TV>()); + } + } + + /// <summary> + /// Reads the keys. + /// </summary> + private static IEnumerable<TK> ReadKeys(IBinaryRawReader reader) + { + var cnt = reader.ReadInt(); + + for (var i = 0; i < cnt; i++) + { + yield return reader.ReadObject<TK>(); + } + } + /// <summary> + /// Reads the keys. + /// </summary> + private static ICollection<TK> ReadAllKeys(IBinaryRawReader reader) + { + var cnt = reader.ReadInt(); + var res = new List<TK>(cnt); + + for (var i = 0; i < cnt; i++) + { + res.Add(reader.ReadObject<TK>()); + } + + return res; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/ICacheStoreInternal.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/ICacheStoreInternal.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/ICacheStoreInternal.cs new file mode 100644 index 0000000..7ec44d9 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/ICacheStoreInternal.cs @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache.Store +{ + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Impl.Binary.IO; + + /// <summary> + /// Provides a non-generic way to work with <see cref="CacheStoreInternal{TK, TV}"/>. + /// </summary> + internal interface ICacheStoreInternal + { + /// <summary> + /// Invokes a store operation. + /// </summary> + /// <param name="stream">Input stream.</param> + /// <param name="grid">Grid.</param> + /// <returns>Invocation result.</returns> + /// <exception cref="IgniteException">Invalid operation type: + opType</exception> + int Invoke(IBinaryStream stream, Ignite grid); + + /// <summary> + /// Initializes this instance with a grid. + /// </summary> + /// <param name="grid">Grid.</param> + void Init(Ignite grid); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/examples/Apache.Ignite.Examples/Datagrid/StoreExample.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/examples/Apache.Ignite.Examples/Datagrid/StoreExample.cs b/modules/platforms/dotnet/examples/Apache.Ignite.Examples/Datagrid/StoreExample.cs index 62da647..6915d79 100644 --- a/modules/platforms/dotnet/examples/Apache.Ignite.Examples/Datagrid/StoreExample.cs +++ b/modules/platforms/dotnet/examples/Apache.Ignite.Examples/Datagrid/StoreExample.cs @@ -59,6 +59,7 @@ namespace Apache.Ignite.Examples.Datagrid Name = CacheName, ReadThrough = true, WriteThrough = true, + KeepBinaryInStore = false, // Cache store works with deserialized data. CacheStoreFactory = new EmployeeStoreFactory() }); http://git-wip-us.apache.org/repos/asf/ignite/blob/eab8334b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Datagrid/EmployeeStore.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Datagrid/EmployeeStore.cs b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Datagrid/EmployeeStore.cs index 7049011..9eb6539 100644 --- a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Datagrid/EmployeeStore.cs +++ b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Datagrid/EmployeeStore.cs @@ -18,7 +18,6 @@ namespace Apache.Ignite.ExamplesDll.Datagrid { using System; - using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; using Apache.Ignite.Core.Cache; @@ -28,22 +27,22 @@ namespace Apache.Ignite.ExamplesDll.Datagrid /// <summary> /// Example cache store implementation. /// </summary> - public class EmployeeStore : CacheStoreAdapter + public class EmployeeStore : CacheStoreAdapter<int, Employee> { /// <summary> /// Dictionary representing the store. /// </summary> - private readonly ConcurrentDictionary<object, object> _db = new ConcurrentDictionary<object, object>( - new List<KeyValuePair<object, object>> + private readonly ConcurrentDictionary<int, Employee> _db = new ConcurrentDictionary<int, Employee>( + new List<KeyValuePair<int, Employee>> { - new KeyValuePair<object, object>(1, new Employee( + new KeyValuePair<int, Employee>(1, new Employee( "Allison Mathis", 25300, new Address("2702 Freedom Lane, San Francisco, CA", 94109), new List<string> {"Development"} )), - new KeyValuePair<object, object>(2, new Employee( + new KeyValuePair<int, Employee>(2, new Employee( "Breana Robbin", 6500, new Address("3960 Sundown Lane, Austin, TX", 78130), @@ -57,7 +56,7 @@ namespace Apache.Ignite.ExamplesDll.Datagrid /// </summary> /// <param name="act">Action that loads a cache entry.</param> /// <param name="args">Optional arguments.</param> - public override void LoadCache(Action<object, object> act, params object[] args) + public override void LoadCache(Action<int, Employee> act, params object[] args) { // Iterate over whole underlying store and call act on each entry to load it into the cache. foreach (var entry in _db) @@ -72,9 +71,9 @@ namespace Apache.Ignite.ExamplesDll.Datagrid /// <returns> /// A map of key, values to be stored in the cache. /// </returns> - public override IDictionary LoadAll(ICollection keys) + public override IEnumerable<KeyValuePair<int, Employee>> LoadAll(IEnumerable<int> keys) { - var result = new Dictionary<object, object>(); + var result = new Dictionary<int, Employee>(); foreach (var key in keys) result[key] = Load(key); @@ -88,9 +87,9 @@ namespace Apache.Ignite.ExamplesDll.Datagrid /// </summary> /// <param name="key">Key to load.</param> /// <returns>Loaded value</returns> - public override object Load(object key) + public override Employee Load(int key) { - object val; + Employee val; _db.TryGetValue(key, out val); @@ -102,7 +101,7 @@ namespace Apache.Ignite.ExamplesDll.Datagrid /// </summary> /// <param name="key">Key to write.</param> /// <param name="val">Value to write.</param> - public override void Write(object key, object val) + public override void Write(int key, Employee val) { _db[key] = val; } @@ -111,9 +110,9 @@ namespace Apache.Ignite.ExamplesDll.Datagrid /// Delete cache entry form store. /// </summary> /// <param name="key">Key to delete.</param> - public override void Delete(object key) + public override void Delete(int key) { - object val; + Employee val; _db.TryRemove(key, out val); }
