IGNITE-2844: .NET: Added "LoadAll" methods to cache API. This closes #562.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fc9730a9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fc9730a9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fc9730a9 Branch: refs/heads/ignite-gg-10994 Commit: fc9730a9ae33b36ee8b6430583b39f13dfdd16de Parents: 0013955 Author: Pavel Tupitsyn <[email protected]> Authored: Wed Mar 23 12:44:44 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Wed Mar 23 12:44:44 2016 +0300 ---------------------------------------------------------------------- .../platform/cache/PlatformCache.java | 54 ++++++++++++++++++++ .../platform/utils/PlatformFutureUtils.java | 2 +- .../Cache/CacheTestAsyncWrapper.cs | 12 +++++ .../Cache/Store/CacheStoreTest.cs | 31 +++++++++++ .../Cache/Store/CacheTestStore.cs | 2 +- .../dotnet/Apache.Ignite.Core/Cache/ICache.cs | 26 ++++++++++ .../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs | 17 ++++++ .../Apache.Ignite.Core/Impl/Cache/CacheOp.cs | 3 +- 8 files changed, 144 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/fc9730a9/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java index 37fd335..35ccd19 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java @@ -44,16 +44,19 @@ import org.apache.ignite.internal.processors.platform.cache.query.PlatformFields import org.apache.ignite.internal.processors.platform.cache.query.PlatformQueryCursor; import org.apache.ignite.internal.processors.platform.utils.PlatformConfigurationUtils; import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; +import org.apache.ignite.internal.processors.platform.utils.PlatformListenable; import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; import org.apache.ignite.internal.util.GridConcurrentFactory; import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.typedef.C1; +import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteFuture; import org.jetbrains.annotations.Nullable; import javax.cache.Cache; import javax.cache.expiry.Duration; import javax.cache.expiry.ExpiryPolicy; +import javax.cache.integration.CompletionListener; import javax.cache.processor.EntryProcessorException; import javax.cache.processor.EntryProcessorResult; import java.util.Iterator; @@ -183,6 +186,9 @@ public class PlatformCache extends PlatformAbstractTarget { /** */ public static final int OP_GET_CONFIG = 39; + /** */ + public static final int OP_LOAD_ALL = 40; + /** Underlying JCache. */ private final IgniteCacheProxy cache; @@ -369,6 +375,19 @@ public class PlatformCache extends PlatformAbstractTarget { case OP_IS_LOCAL_LOCKED: return cache.isLocalLocked(reader.readObjectDetached(), reader.readBoolean()) ? TRUE : FALSE; + case OP_LOAD_ALL: { + long futId = reader.readLong(); + boolean replaceExisting = reader.readBoolean(); + + CompletionListenable fut = new CompletionListenable(); + + PlatformFutureUtils.listen(platformCtx, fut, futId, PlatformFutureUtils.TYP_OBJ, null, this); + + cache.loadAll(PlatformUtils.readSet(reader), replaceExisting, fut); + + return TRUE; + } + default: return super.processInStreamOutLong(type, reader); } @@ -1101,4 +1120,39 @@ public class PlatformCache extends PlatformAbstractTarget { } } } + + /** + * Listenable around CompletionListener. + */ + private static class CompletionListenable implements PlatformListenable, CompletionListener { + /** */ + private IgniteBiInClosure<Object, Throwable> lsnr; + + /** {@inheritDoc} */ + @Override public void onCompletion() { + assert lsnr != null; + + lsnr.apply(null, null); + } + + /** {@inheritDoc} */ + @Override public void onException(Exception e) { + lsnr.apply(null, e); + } + + /** {@inheritDoc} */ + @Override public void listen(IgniteBiInClosure<Object, Throwable> lsnr) { + this.lsnr = lsnr; + } + + /** {@inheritDoc} */ + @Override public boolean cancel() throws IgniteCheckedException { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isCancelled() { + return false; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/fc9730a9/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java index 7a86201..8fad7d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java @@ -182,7 +182,7 @@ public class PlatformFutureUtils { * @param writer Optional writer. */ @SuppressWarnings("unchecked") - private static void listen(final PlatformContext ctx, PlatformListenable listenable, final long futPtr, final + public static void listen(final PlatformContext ctx, PlatformListenable listenable, final long futPtr, final int typ, @Nullable final Writer writer, final PlatformAbstractTarget target) { final PlatformCallbackGateway gate = ctx.gateway(); http://git-wip-us.apache.org/repos/asf/ignite/blob/fc9730a9/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncWrapper.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncWrapper.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncWrapper.cs index 09e57dc..ff0c37c 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncWrapper.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncWrapper.cs @@ -119,6 +119,18 @@ namespace Apache.Ignite.Core.Tests.Cache } /** <inheritDoc /> */ + public void LoadAll(IEnumerable<TK> keys, bool replaceExistingValues) + { + _cache.LoadAll(keys, replaceExistingValues); + } + + /** <inheritDoc /> */ + public Task LoadAllAsync(IEnumerable<TK> keys, bool replaceExistingValues) + { + return _cache.LoadAllAsync(keys, replaceExistingValues); + } + + /** <inheritDoc /> */ public bool ContainsKey(TK key) { return GetResult(_cache.ContainsKeyAsync(key)); http://git-wip-us.apache.org/repos/asf/ignite/blob/fc9730a9/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs index cc46642..76ec384 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs @@ -21,6 +21,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Store using System; using System.Collections; using System.Collections.Generic; + using System.Linq; using Apache.Ignite.Core.Binary; using Apache.Ignite.Core.Cache; using Apache.Ignite.Core.Cache.Store; @@ -474,6 +475,36 @@ namespace Apache.Ignite.Core.Tests.Cache.Store _storeCount++; } + [Test] + public void TestLoadAll([Values(true, false)] bool isAsync) + { + var cache = GetCache(); + + var loadAll = isAsync + ? (Action<IEnumerable<int>, bool>) ((x, y) => { cache.LoadAllAsync(x, y).Wait(); }) + : cache.LoadAll; + + Assert.AreEqual(0, cache.GetSize()); + + loadAll(Enumerable.Range(105, 5), false); + + Assert.AreEqual(5, cache.GetSize()); + + for (int i = 105; i < 110; i++) + Assert.AreEqual("val_" + i, cache[i]); + + // Test overwrite + cache[105] = "42"; + + cache.LocalEvict(new[] { 105 }); + loadAll(new[] {105}, false); + Assert.AreEqual("42", cache[105]); + + loadAll(new[] {105, 106}, true); + Assert.AreEqual("val_105", cache[105]); + Assert.AreEqual("val_106", cache[106]); + } + /// <summary> /// Get's grid name for this test. /// </summary> http://git-wip-us.apache.org/repos/asf/ignite/blob/fc9730a9/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs index 9c381cb..b4b1670 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs @@ -100,7 +100,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Store { Debug.Assert(_grid != null); - return keys.OfType<object>().ToDictionary(key => key, Load); + return keys.OfType<object>().ToDictionary(key => key, key => "val_" + key); } public void Write(object key, object val) http://git-wip-us.apache.org/repos/asf/ignite/blob/fc9730a9/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs index f5e7cd2..9d72cfa 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs @@ -161,6 +161,32 @@ namespace Apache.Ignite.Core.Cache Task LocalLoadCacheAsync(ICacheEntryFilter<TK, TV> p, params object[] args); /// <summary> + /// Loads the specified entries into the cache using the configured + /// <see cref="ICacheStore"/>> for the given keys. + /// <para /> + /// If an entry for a key already exists in the cache, a value will be loaded if and only if + /// <paramref name="replaceExistingValues" /> is true. + /// If no loader is configured for the cache, no objects will be loaded. + /// </summary> + /// <param name="keys">The keys to load.</param> + /// <param name="replaceExistingValues">if set to <c>true</c>, existing cache values will + /// be replaced by those loaded from a cache store.</param> + void LoadAll(IEnumerable<TK> keys, bool replaceExistingValues); + + /// <summary> + /// Asynchronously loads the specified entries into the cache using the configured + /// <see cref="ICacheStore"/>> for the given keys. + /// <para /> + /// If an entry for a key already exists in the cache, a value will be loaded if and only if + /// <paramref name="replaceExistingValues" /> is true. + /// If no loader is configured for the cache, no objects will be loaded. + /// </summary> + /// <param name="keys">The keys to load.</param> + /// <param name="replaceExistingValues">if set to <c>true</c>, existing cache values will + /// be replaced by those loaded from a cache store.</param> + Task LoadAllAsync(IEnumerable<TK> keys, bool replaceExistingValues); + + /// <summary> /// Check if cache contains mapping for this key. /// </summary> /// <param name="key">Key.</param> http://git-wip-us.apache.org/repos/asf/ignite/blob/fc9730a9/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs index 1296596..266012f 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs @@ -288,6 +288,23 @@ namespace Apache.Ignite.Core.Impl.Cache } /** <inheritDoc /> */ + public void LoadAll(IEnumerable<TK> keys, bool replaceExistingValues) + { + LoadAllAsync(keys, replaceExistingValues).Wait(); + } + + /** <inheritDoc /> */ + public Task LoadAllAsync(IEnumerable<TK> keys, bool replaceExistingValues) + { + return GetFuture<object>((futId, futTyp) => DoOutOp((int) CacheOp.LoadAll, writer => + { + writer.WriteLong(futId); + writer.WriteBoolean(replaceExistingValues); + WriteEnumerable(writer, keys); + })).Task; + } + + /** <inheritDoc /> */ public bool ContainsKey(TK key) { IgniteArgumentCheck.NotNull(key, "key"); http://git-wip-us.apache.org/repos/asf/ignite/blob/fc9730a9/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs index 61ccb5f..4c42bf3 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs @@ -60,6 +60,7 @@ namespace Apache.Ignite.Core.Impl.Cache RemoveObj = 36, Replace2 = 37, Replace3 = 38, - GetConfig = 39 + GetConfig = 39, + LoadAll = 40 } } \ No newline at end of file
