http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAbstractTest.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAbstractTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAbstractTest.cs new file mode 100644 index 0000000..55bc76c --- /dev/null +++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAbstractTest.cs @@ -0,0 +1,1181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Tests.Cache.Query.Continuous +{ + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.Diagnostics.CodeAnalysis; + using System.Linq; + using System.Runtime.Serialization; + using System.Threading; + using Apache.Ignite.Core.Cache; + using Apache.Ignite.Core.Cache.Event; + using Apache.Ignite.Core.Cache.Query; + using Apache.Ignite.Core.Cache.Query.Continuous; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Impl; + using Apache.Ignite.Core.Portable; + using Apache.Ignite.Core.Resource; + using NUnit.Framework; + using CQU = Apache.Ignite.Core.Impl.Cache.Query.Continuous.ContinuousQueryUtils; + + /// <summary> + /// Tests for continuous query. + /// </summary> + [SuppressMessage("ReSharper", "InconsistentNaming")] + [SuppressMessage("ReSharper", "PossibleNullReferenceException")] + [SuppressMessage("ReSharper", "StaticMemberInGenericType")] + public abstract class ContinuousQueryAbstractTest + { + /** Cache name: ATOMIC, backup. */ + protected const string CACHE_ATOMIC_BACKUP = "atomic_backup"; + + /** Cache name: ATOMIC, no backup. */ + protected const string CACHE_ATOMIC_NO_BACKUP = "atomic_no_backup"; + + /** Cache name: TRANSACTIONAL, backup. */ + protected const string CACHE_TX_BACKUP = "transactional_backup"; + + /** Cache name: TRANSACTIONAL, no backup. */ + protected const string CACHE_TX_NO_BACKUP = "transactional_no_backup"; + + /** Listener events. */ + public static BlockingCollection<CallbackEvent> CB_EVTS = new BlockingCollection<CallbackEvent>(); + + /** Listener events. */ + public static BlockingCollection<FilterEvent> FILTER_EVTS = new BlockingCollection<FilterEvent>(); + + /** First node. */ + private IIgnite grid1; + + /** Second node. */ + private IIgnite grid2; + + /** Cache on the first node. */ + private ICache<int, PortableEntry> cache1; + + /** Cache on the second node. */ + private ICache<int, PortableEntry> cache2; + + /** Cache name. */ + private readonly string cacheName; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="cacheName">Cache name.</param> + protected ContinuousQueryAbstractTest(string cacheName) + { + this.cacheName = cacheName; + } + + /// <summary> + /// Set-up routine. + /// </summary> + [TestFixtureSetUp] + public void SetUp() + { + GC.Collect(); + TestUtils.JvmDebug = true; + + IgniteConfigurationEx cfg = new IgniteConfigurationEx(); + + PortableConfiguration portCfg = new PortableConfiguration(); + + ICollection<PortableTypeConfiguration> portTypeCfgs = new List<PortableTypeConfiguration>(); + + portTypeCfgs.Add(new PortableTypeConfiguration(typeof(PortableEntry))); + portTypeCfgs.Add(new PortableTypeConfiguration(typeof(PortableFilter))); + portTypeCfgs.Add(new PortableTypeConfiguration(typeof(KeepPortableFilter))); + + portCfg.TypeConfigurations = portTypeCfgs; + + cfg.PortableConfiguration = portCfg; + cfg.JvmClasspath = TestUtils.CreateTestClasspath(); + cfg.JvmOptions = TestUtils.TestJavaOptions(); + cfg.SpringConfigUrl = "config\\cache-query-continuous.xml"; + + cfg.GridName = "grid-1"; + grid1 = Ignition.Start(cfg); + cache1 = grid1.Cache<int, PortableEntry>(cacheName); + + cfg.GridName = "grid-2"; + grid2 = Ignition.Start(cfg); + cache2 = grid2.Cache<int, PortableEntry>(cacheName); + } + + /// <summary> + /// Tear-down routine. + /// </summary> + [TestFixtureTearDown] + public void TearDown() + { + Ignition.StopAll(true); + } + + /// <summary> + /// Before-test routine. + /// </summary> + [SetUp] + public void BeforeTest() + { + CB_EVTS = new BlockingCollection<CallbackEvent>(); + FILTER_EVTS = new BlockingCollection<FilterEvent>(); + + AbstractFilter<PortableEntry>.res = true; + AbstractFilter<PortableEntry>.err = false; + AbstractFilter<PortableEntry>.marshErr = false; + AbstractFilter<PortableEntry>.unmarshErr = false; + + cache1.Remove(PrimaryKey(cache1)); + cache1.Remove(PrimaryKey(cache2)); + + Assert.AreEqual(0, cache1.Size()); + Assert.AreEqual(0, cache2.Size()); + + Console.WriteLine("Test started: " + TestContext.CurrentContext.Test.Name); + } + + /// <summary> + /// Test arguments validation. + /// </summary> + [Test] + public void TestValidation() + { + Assert.Throws<ArgumentException>(() => { cache1.QueryContinuous(new ContinuousQuery<int, PortableEntry>(null)); }); + } + + /// <summary> + /// Test multiple closes. + /// </summary> + [Test] + public void TestMultipleClose() + { + int key1 = PrimaryKey(cache1); + int key2 = PrimaryKey(cache2); + + ContinuousQuery<int, PortableEntry> qry = + new ContinuousQuery<int, PortableEntry>(new Listener<PortableEntry>()); + + IDisposable qryHnd; + + using (qryHnd = cache1.QueryContinuous(qry)) + { + // Put from local node. + cache1.GetAndPut(key1, Entry(key1)); + CheckCallbackSingle(key1, null, Entry(key1)); + + // Put from remote node. + cache2.GetAndPut(key2, Entry(key2)); + CheckCallbackSingle(key2, null, Entry(key2)); + } + + qryHnd.Dispose(); + } + + /// <summary> + /// Test regular callback operations. + /// </summary> + [Test] + public void TestCallback() + { + CheckCallback(false); + } + + /// <summary> + /// Check regular callback execution. + /// </summary> + /// <param name="loc"></param> + protected void CheckCallback(bool loc) + { + int key1 = PrimaryKey(cache1); + int key2 = PrimaryKey(cache2); + + ContinuousQuery<int, PortableEntry> qry = loc ? + new ContinuousQuery<int, PortableEntry>(new Listener<PortableEntry>(), true) : + new ContinuousQuery<int, PortableEntry>(new Listener<PortableEntry>()); + + using (cache1.QueryContinuous(qry)) + { + // Put from local node. + cache1.GetAndPut(key1, Entry(key1)); + CheckCallbackSingle(key1, null, Entry(key1)); + + cache1.GetAndPut(key1, Entry(key1 + 1)); + CheckCallbackSingle(key1, Entry(key1), Entry(key1 + 1)); + + cache1.Remove(key1); + CheckCallbackSingle(key1, Entry(key1 + 1), null); + + // Put from remote node. + cache2.GetAndPut(key2, Entry(key2)); + + if (loc) + CheckNoCallback(100); + else + CheckCallbackSingle(key2, null, Entry(key2)); + + cache1.GetAndPut(key2, Entry(key2 + 1)); + + if (loc) + CheckNoCallback(100); + else + CheckCallbackSingle(key2, Entry(key2), Entry(key2 + 1)); + + cache1.Remove(key2); + + if (loc) + CheckNoCallback(100); + else + CheckCallbackSingle(key2, Entry(key2 + 1), null); + } + + cache1.Put(key1, Entry(key1)); + CheckNoCallback(100); + + cache1.Put(key2, Entry(key2)); + CheckNoCallback(100); + } + + /// <summary> + /// Test Ignite injection into callback. + /// </summary> + [Test] + public void TestCallbackInjection() + { + Listener<PortableEntry> cb = new Listener<PortableEntry>(); + + Assert.IsNull(cb.ignite); + + using (cache1.QueryContinuous(new ContinuousQuery<int, PortableEntry>(cb))) + { + Assert.IsNotNull(cb.ignite); + } + } + + /// <summary> + /// Test portable filter logic. + /// </summary> + [Test] + public void TestFilterPortable() + { + CheckFilter(true, false); + } + + /// <summary> + /// Test serializable filter logic. + /// </summary> + [Test] + public void TestFilterSerializable() + { + CheckFilter(false, false); + } + + /// <summary> + /// Check filter. + /// </summary> + /// <param name="portable">Portable.</param> + /// <param name="loc">Local cache flag.</param> + protected void CheckFilter(bool portable, bool loc) + { + ICacheEntryEventListener<int, PortableEntry> lsnr = new Listener<PortableEntry>(); + ICacheEntryEventFilter<int, PortableEntry> filter = + portable ? (AbstractFilter<PortableEntry>)new PortableFilter() : new SerializableFilter(); + + ContinuousQuery<int, PortableEntry> qry = loc ? + new ContinuousQuery<int, PortableEntry>(lsnr, filter, true) : + new ContinuousQuery<int, PortableEntry>(lsnr, filter); + + using (cache1.QueryContinuous(qry)) + { + // Put from local node. + int key1 = PrimaryKey(cache1); + cache1.GetAndPut(key1, Entry(key1)); + CheckFilterSingle(key1, null, Entry(key1)); + CheckCallbackSingle(key1, null, Entry(key1)); + + // Put from remote node. + int key2 = PrimaryKey(cache2); + cache1.GetAndPut(key2, Entry(key2)); + + if (loc) + { + CheckNoFilter(key2); + CheckNoCallback(key2); + } + else + { + CheckFilterSingle(key2, null, Entry(key2)); + CheckCallbackSingle(key2, null, Entry(key2)); + } + + AbstractFilter<PortableEntry>.res = false; + + // Ignored put from local node. + cache1.GetAndPut(key1, Entry(key1 + 1)); + CheckFilterSingle(key1, Entry(key1), Entry(key1 + 1)); + CheckNoCallback(100); + + // Ignored put from remote node. + cache1.GetAndPut(key2, Entry(key2 + 1)); + + if (loc) + CheckNoFilter(100); + else + CheckFilterSingle(key2, Entry(key2), Entry(key2 + 1)); + + CheckNoCallback(100); + } + } + + /// <summary> + /// Test portable filter error during invoke. + /// </summary> + [Ignore("IGNITE-521")] + [Test] + public void TestFilterInvokeErrorPortable() + { + CheckFilterInvokeError(true); + } + + /// <summary> + /// Test serializable filter error during invoke. + /// </summary> + [Ignore("IGNITE-521")] + [Test] + public void TestFilterInvokeErrorSerializable() + { + CheckFilterInvokeError(false); + } + + /// <summary> + /// Check filter error handling logic during invoke. + /// </summary> + private void CheckFilterInvokeError(bool portable) + { + AbstractFilter<PortableEntry>.err = true; + + ICacheEntryEventListener<int, PortableEntry> lsnr = new Listener<PortableEntry>(); + ICacheEntryEventFilter<int, PortableEntry> filter = + portable ? (AbstractFilter<PortableEntry>) new PortableFilter() : new SerializableFilter(); + + ContinuousQuery<int, PortableEntry> qry = new ContinuousQuery<int, PortableEntry>(lsnr, filter); + + using (cache1.QueryContinuous(qry)) + { + // Put from local node. + try + { + cache1.GetAndPut(PrimaryKey(cache1), Entry(1)); + + Assert.Fail("Should not reach this place."); + } + catch (IgniteException) + { + // No-op. + } + catch (Exception) + { + Assert.Fail("Unexpected error."); + } + + // Put from remote node. + try + { + cache1.GetAndPut(PrimaryKey(cache2), Entry(1)); + + Assert.Fail("Should not reach this place."); + } + catch (IgniteException) + { + // No-op. + } + catch (Exception) + { + Assert.Fail("Unexpected error."); + } + } + } + + /// <summary> + /// Test portable filter marshalling error. + /// </summary> + [Test] + public void TestFilterMarshalErrorPortable() + { + CheckFilterMarshalError(true); + } + + /// <summary> + /// Test serializable filter marshalling error. + /// </summary> + [Test] + public void TestFilterMarshalErrorSerializable() + { + CheckFilterMarshalError(false); + } + + /// <summary> + /// Check filter marshal error handling. + /// </summary> + /// <param name="portable">Portable flag.</param> + private void CheckFilterMarshalError(bool portable) + { + AbstractFilter<PortableEntry>.marshErr = true; + + ICacheEntryEventListener<int, PortableEntry> lsnr = new Listener<PortableEntry>(); + ICacheEntryEventFilter<int, PortableEntry> filter = + portable ? (AbstractFilter<PortableEntry>)new PortableFilter() : new SerializableFilter(); + + ContinuousQuery<int, PortableEntry> qry = new ContinuousQuery<int, PortableEntry>(lsnr, filter); + + Assert.Throws<Exception>(() => + { + using (cache1.QueryContinuous(qry)) + { + // No-op. + } + }); + } + + /// <summary> + /// Test non-serializable filter error. + /// </summary> + [Test] + public void TestFilterNonSerializable() + { + CheckFilterNonSerializable(false); + } + + /// <summary> + /// Test non-serializable filter behavior. + /// </summary> + /// <param name="loc"></param> + protected void CheckFilterNonSerializable(bool loc) + { + AbstractFilter<PortableEntry>.unmarshErr = true; + + ICacheEntryEventListener<int, PortableEntry> lsnr = new Listener<PortableEntry>(); + ICacheEntryEventFilter<int, PortableEntry> filter = new LocalFilter(); + + ContinuousQuery<int, PortableEntry> qry = loc + ? new ContinuousQuery<int, PortableEntry>(lsnr, filter, true) + : new ContinuousQuery<int, PortableEntry>(lsnr, filter); + + if (loc) + { + using (cache1.QueryContinuous(qry)) + { + // Local put must be fine. + int key1 = PrimaryKey(cache1); + cache1.GetAndPut(key1, Entry(key1)); + CheckFilterSingle(key1, null, Entry(key1)); + } + } + else + { + Assert.Throws<SerializationException>(() => + { + using (cache1.QueryContinuous(qry)) + { + // No-op. + } + }); + } + } + + /// <summary> + /// Test portable filter unmarshalling error. + /// </summary> + [Ignore("IGNITE-521")] + [Test] + public void TestFilterUnmarshalErrorPortable() + { + CheckFilterUnmarshalError(true); + } + + /// <summary> + /// Test serializable filter unmarshalling error. + /// </summary> + [Ignore("IGNITE-521")] + [Test] + public void TestFilterUnmarshalErrorSerializable() + { + CheckFilterUnmarshalError(false); + } + + /// <summary> + /// Check filter unmarshal error handling. + /// </summary> + /// <param name="portable">Portable flag.</param> + private void CheckFilterUnmarshalError(bool portable) + { + AbstractFilter<PortableEntry>.unmarshErr = true; + + ICacheEntryEventListener<int, PortableEntry> lsnr = new Listener<PortableEntry>(); + ICacheEntryEventFilter<int, PortableEntry> filter = + portable ? (AbstractFilter<PortableEntry>)new PortableFilter() : new SerializableFilter(); + + ContinuousQuery<int, PortableEntry> qry = new ContinuousQuery<int, PortableEntry>(lsnr, filter); + + using (cache1.QueryContinuous(qry)) + { + // Local put must be fine. + int key1 = PrimaryKey(cache1); + cache1.GetAndPut(key1, Entry(key1)); + CheckFilterSingle(key1, null, Entry(key1)); + + // Remote put must fail. + try + { + cache1.GetAndPut(PrimaryKey(cache2), Entry(1)); + + Assert.Fail("Should not reach this place."); + } + catch (IgniteException) + { + // No-op. + } + catch (Exception) + { + Assert.Fail("Unexpected error."); + } + } + } + + /// <summary> + /// Test Ignite injection into filters. + /// </summary> + [Test] + public void TestFilterInjection() + { + Listener<PortableEntry> cb = new Listener<PortableEntry>(); + PortableFilter filter = new PortableFilter(); + + Assert.IsNull(filter.ignite); + + using (cache1.QueryContinuous(new ContinuousQuery<int, PortableEntry>(cb, filter))) + { + // Local injection. + Assert.IsNotNull(filter.ignite); + + // Remote injection. + cache1.GetAndPut(PrimaryKey(cache2), Entry(1)); + + FilterEvent evt; + + Assert.IsTrue(FILTER_EVTS.TryTake(out evt, 500)); + + Assert.IsNotNull(evt.ignite); + } + } + + + /// <summary> + /// Test "keep-portable" scenario. + /// </summary> + [Test] + public void TestKeepPortable() + { + var cache = cache1.WithKeepPortable<int, IPortableObject>(); + + ContinuousQuery<int, IPortableObject> qry = new ContinuousQuery<int, IPortableObject>( + new Listener<IPortableObject>(), new KeepPortableFilter()); + + using (cache.QueryContinuous(qry)) + { + // 1. Local put. + cache1.GetAndPut(PrimaryKey(cache1), Entry(1)); + + CallbackEvent cbEvt; + FilterEvent filterEvt; + + Assert.IsTrue(FILTER_EVTS.TryTake(out filterEvt, 500)); + Assert.AreEqual(PrimaryKey(cache1), filterEvt.entry.Key); + Assert.AreEqual(null, filterEvt.entry.OldValue); + Assert.AreEqual(Entry(1), (filterEvt.entry.Value as IPortableObject) + .Deserialize<PortableEntry>()); + + Assert.IsTrue(CB_EVTS.TryTake(out cbEvt, 500)); + Assert.AreEqual(1, cbEvt.entries.Count); + Assert.AreEqual(PrimaryKey(cache1), cbEvt.entries.First().Key); + Assert.AreEqual(null, cbEvt.entries.First().OldValue); + Assert.AreEqual(Entry(1), (cbEvt.entries.First().Value as IPortableObject) + .Deserialize<PortableEntry>()); + + // 2. Remote put. + cache1.GetAndPut(PrimaryKey(cache2), Entry(2)); + + Assert.IsTrue(FILTER_EVTS.TryTake(out filterEvt, 500)); + Assert.AreEqual(PrimaryKey(cache2), filterEvt.entry.Key); + Assert.AreEqual(null, filterEvt.entry.OldValue); + Assert.AreEqual(Entry(2), (filterEvt.entry.Value as IPortableObject) + .Deserialize<PortableEntry>()); + + Assert.IsTrue(CB_EVTS.TryTake(out cbEvt, 500)); + Assert.AreEqual(1, cbEvt.entries.Count); + Assert.AreEqual(PrimaryKey(cache2), cbEvt.entries.First().Key); + Assert.AreEqual(null, cbEvt.entries.First().OldValue); + Assert.AreEqual(Entry(2), + (cbEvt.entries.First().Value as IPortableObject).Deserialize<PortableEntry>()); + } + } + + /// <summary> + /// Test whether buffer size works fine. + /// </summary> + [Test] + public void TestBufferSize() + { + // Put two remote keys in advance. + List<int> rmtKeys = PrimaryKeys(cache2, 2); + + ContinuousQuery<int, PortableEntry> qry = new ContinuousQuery<int, PortableEntry>(new Listener<PortableEntry>()); + + qry.BufferSize = 2; + qry.TimeInterval = TimeSpan.FromMilliseconds(1000000); + + using (cache1.QueryContinuous(qry)) + { + qry.BufferSize = 2; + + cache1.GetAndPut(rmtKeys[0], Entry(rmtKeys[0])); + + CheckNoCallback(100); + + cache1.GetAndPut(rmtKeys[1], Entry(rmtKeys[1])); + + CallbackEvent evt; + + Assert.IsTrue(CB_EVTS.TryTake(out evt, 1000)); + + Assert.AreEqual(2, evt.entries.Count); + + var entryRmt0 = evt.entries.Single(entry => { return entry.Key.Equals(rmtKeys[0]); }); + var entryRmt1 = evt.entries.Single(entry => { return entry.Key.Equals(rmtKeys[1]); }); + + Assert.AreEqual(rmtKeys[0], entryRmt0.Key); + Assert.IsNull(entryRmt0.OldValue); + Assert.AreEqual(Entry(rmtKeys[0]), entryRmt0.Value); + + Assert.AreEqual(rmtKeys[1], entryRmt1.Key); + Assert.IsNull(entryRmt1.OldValue); + Assert.AreEqual(Entry(rmtKeys[1]), entryRmt1.Value); + } + + cache1.Remove(rmtKeys[0]); + cache1.Remove(rmtKeys[1]); + } + + /// <summary> + /// Test whether timeout works fine. + /// </summary> + [Test] + public void TestTimeout() + { + int key1 = PrimaryKey(cache1); + int key2 = PrimaryKey(cache2); + + ContinuousQuery<int, PortableEntry> qry = + new ContinuousQuery<int, PortableEntry>(new Listener<PortableEntry>()); + + qry.BufferSize = 2; + qry.TimeInterval = TimeSpan.FromMilliseconds(500); + + using (cache1.QueryContinuous(qry)) + { + // Put from local node. + cache1.GetAndPut(key1, Entry(key1)); + CheckCallbackSingle(key1, null, Entry(key1)); + + // Put from remote node. + cache1.GetAndPut(key2, Entry(key2)); + CheckNoCallback(100); + CheckCallbackSingle(key2, null, Entry(key2), 1000); + } + } + + /// <summary> + /// Test whether nested Ignite API call from callback works fine. + /// </summary> + [Test] + public void TestNestedCallFromCallback() + { + var cache = cache1.WithKeepPortable<int, IPortableObject>(); + + int key = PrimaryKey(cache1); + + NestedCallListener cb = new NestedCallListener(); + + using (cache.QueryContinuous(new ContinuousQuery<int, IPortableObject>(cb))) + { + cache1.GetAndPut(key, Entry(key)); + + cb.countDown.Wait(); + } + + cache.Remove(key); + } + + /// <summary> + /// Tests the initial query. + /// </summary> + [Test] + public void TestInitialQuery() + { + // Scan query, GetAll + TestInitialQuery(new ScanQuery<int, PortableEntry>(new InitialQueryScanFilter()), cur => cur.GetAll()); + + // Scan query, iterator + TestInitialQuery(new ScanQuery<int, PortableEntry>(new InitialQueryScanFilter()), cur => cur.ToList()); + + // Sql query, GetAll + TestInitialQuery(new SqlQuery(typeof(PortableEntry), "val < 33"), cur => cur.GetAll()); + + // Sql query, iterator + TestInitialQuery(new SqlQuery(typeof(PortableEntry), "val < 33"), cur => cur.ToList()); + + // Text query, GetAll + TestInitialQuery(new TextQuery(typeof(PortableEntry), "1*"), cur => cur.GetAll()); + + // Text query, iterator + TestInitialQuery(new TextQuery(typeof(PortableEntry), "1*"), cur => cur.ToList()); + + // Test exception: invalid initial query + var ex = Assert.Throws<IgniteException>( + () => TestInitialQuery(new TextQuery(typeof (PortableEntry), "*"), cur => cur.GetAll())); + + Assert.AreEqual("Cannot parse '*': '*' or '?' not allowed as first character in WildcardQuery", ex.Message); + } + + /// <summary> + /// Tests the initial query. + /// </summary> + private void TestInitialQuery(QueryBase initialQry, Func<IQueryCursor<ICacheEntry<int, PortableEntry>>, + IEnumerable<ICacheEntry<int, PortableEntry>>> getAllFunc) + { + var qry = new ContinuousQuery<int, PortableEntry>(new Listener<PortableEntry>()); + + cache1.Put(11, Entry(11)); + cache1.Put(12, Entry(12)); + cache1.Put(33, Entry(33)); + + try + { + IContinuousQueryHandle<ICacheEntry<int, PortableEntry>> contQry; + + using (contQry = cache1.QueryContinuous(qry, initialQry)) + { + // Check initial query + var initialEntries = + getAllFunc(contQry.GetInitialQueryCursor()).Distinct().OrderBy(x => x.Key).ToList(); + + Assert.Throws<InvalidOperationException>(() => contQry.GetInitialQueryCursor()); + + Assert.AreEqual(2, initialEntries.Count); + + for (int i = 0; i < initialEntries.Count; i++) + { + Assert.AreEqual(i + 11, initialEntries[i].Key); + Assert.AreEqual(i + 11, initialEntries[i].Value.val); + } + + // Check continuous query + cache1.Put(44, Entry(44)); + CheckCallbackSingle(44, null, Entry(44)); + } + + Assert.Throws<ObjectDisposedException>(() => contQry.GetInitialQueryCursor()); + + contQry.Dispose(); // multiple dispose calls are ok + } + finally + { + cache1.Clear(); + } + } + + /// <summary> + /// Check single filter event. + /// </summary> + /// <param name="expKey">Expected key.</param> + /// <param name="expOldVal">Expected old value.</param> + /// <param name="expVal">Expected value.</param> + private void CheckFilterSingle(int expKey, PortableEntry expOldVal, PortableEntry expVal) + { + CheckFilterSingle(expKey, expOldVal, expVal, 1000); + } + + /// <summary> + /// Check single filter event. + /// </summary> + /// <param name="expKey">Expected key.</param> + /// <param name="expOldVal">Expected old value.</param> + /// <param name="expVal">Expected value.</param> + /// <param name="timeout">Timeout.</param> + private void CheckFilterSingle(int expKey, PortableEntry expOldVal, PortableEntry expVal, int timeout) + { + FilterEvent evt; + + Assert.IsTrue(FILTER_EVTS.TryTake(out evt, timeout)); + + Assert.AreEqual(expKey, evt.entry.Key); + Assert.AreEqual(expOldVal, evt.entry.OldValue); + Assert.AreEqual(expVal, evt.entry.Value); + } + + /// <summary> + /// Ensure that no filter events are logged. + /// </summary> + /// <param name="timeout">Timeout.</param> + private void CheckNoFilter(int timeout) + { + FilterEvent evt; + + Assert.IsFalse(FILTER_EVTS.TryTake(out evt, timeout)); + } + + /// <summary> + /// Check single callback event. + /// </summary> + /// <param name="expKey">Expected key.</param> + /// <param name="expOldVal">Expected old value.</param> + /// <param name="expVal">Expected new value.</param> + private void CheckCallbackSingle(int expKey, PortableEntry expOldVal, PortableEntry expVal) + { + CheckCallbackSingle(expKey, expOldVal, expVal, 1000); + } + + /// <summary> + /// Check single callback event. + /// </summary> + /// <param name="expKey">Expected key.</param> + /// <param name="expOldVal">Expected old value.</param> + /// <param name="expVal">Expected new value.</param> + /// <param name="timeout">Timeout.</param> + private void CheckCallbackSingle(int expKey, PortableEntry expOldVal, PortableEntry expVal, int timeout) + { + CallbackEvent evt; + + Assert.IsTrue(CB_EVTS.TryTake(out evt, timeout)); + + Assert.AreEqual(1, evt.entries.Count); + + Assert.AreEqual(expKey, evt.entries.First().Key); + Assert.AreEqual(expOldVal, evt.entries.First().OldValue); + Assert.AreEqual(expVal, evt.entries.First().Value); + } + + /// <summary> + /// Ensure that no callback events are logged. + /// </summary> + /// <param name="timeout">Timeout.</param> + private void CheckNoCallback(int timeout) + { + CallbackEvent evt; + + Assert.IsFalse(CB_EVTS.TryTake(out evt, timeout)); + } + + /// <summary> + /// Craate entry. + /// </summary> + /// <param name="val">Value.</param> + /// <returns>Entry.</returns> + private static PortableEntry Entry(int val) + { + return new PortableEntry(val); + } + + /// <summary> + /// Get primary key for cache. + /// </summary> + /// <param name="cache">Cache.</param> + /// <returns>Primary key.</returns> + private static int PrimaryKey<T>(ICache<int, T> cache) + { + return PrimaryKeys(cache, 1)[0]; + } + + /// <summary> + /// Get primary keys for cache. + /// </summary> + /// <param name="cache">Cache.</param> + /// <param name="cnt">Amount of keys.</param> + /// <param name="startFrom">Value to start from.</param> + /// <returns></returns> + private static List<int> PrimaryKeys<T>(ICache<int, T> cache, int cnt, int startFrom = 0) + { + IClusterNode node = cache.Ignite.Cluster.LocalNode; + + ICacheAffinity aff = cache.Ignite.Affinity(cache.Name); + + List<int> keys = new List<int>(cnt); + + for (int i = startFrom; i < startFrom + 100000; i++) + { + if (aff.IsPrimary(node, i)) + { + keys.Add(i); + + if (keys.Count == cnt) + return keys; + } + } + + Assert.Fail("Failed to find " + cnt + " primary keys."); + + return null; + } + + /// <summary> + /// Portable entry. + /// </summary> + public class PortableEntry + { + /** Value. */ + public readonly int val; + + /** <inheritDot /> */ + public override int GetHashCode() + { + return val; + } + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="val">Value.</param> + public PortableEntry(int val) + { + this.val = val; + } + + /** <inheritDoc /> */ + public override bool Equals(object obj) + { + return obj != null && obj is PortableEntry && ((PortableEntry)obj).val == val; + } + } + + /// <summary> + /// Abstract filter. + /// </summary> + [Serializable] + public abstract class AbstractFilter<V> : ICacheEntryEventFilter<int, V> + { + /** Result. */ + public static volatile bool res = true; + + /** Throw error on invocation. */ + public static volatile bool err; + + /** Throw error during marshalling. */ + public static volatile bool marshErr; + + /** Throw error during unmarshalling. */ + public static volatile bool unmarshErr; + + /** Grid. */ + [InstanceResource] + public IIgnite ignite; + + /** <inheritDoc /> */ + public bool Evaluate(ICacheEntryEvent<int, V> evt) + { + if (err) + throw new Exception("Filter error."); + + FILTER_EVTS.Add(new FilterEvent(ignite, + CQU.CreateEvent<object, object>(evt.Key, evt.OldValue, evt.Value))); + + return res; + } + } + + /// <summary> + /// Filter which cannot be serialized. + /// </summary> + public class LocalFilter : AbstractFilter<PortableEntry> + { + // No-op. + } + + /// <summary> + /// Portable filter. + /// </summary> + public class PortableFilter : AbstractFilter<PortableEntry>, IPortableMarshalAware + { + /** <inheritDoc /> */ + public void WritePortable(IPortableWriter writer) + { + if (marshErr) + throw new Exception("Filter marshalling error."); + } + + /** <inheritDoc /> */ + public void ReadPortable(IPortableReader reader) + { + if (unmarshErr) + throw new Exception("Filter unmarshalling error."); + } + } + + /// <summary> + /// Serializable filter. + /// </summary> + [Serializable] + public class SerializableFilter : AbstractFilter<PortableEntry>, ISerializable + { + /// <summary> + /// Constructor. + /// </summary> + public SerializableFilter() + { + // No-op. + } + + /// <summary> + /// Serialization constructor. + /// </summary> + /// <param name="info">Info.</param> + /// <param name="context">Context.</param> + protected SerializableFilter(SerializationInfo info, StreamingContext context) + { + if (unmarshErr) + throw new Exception("Filter unmarshalling error."); + } + + /** <inheritDoc /> */ + public void GetObjectData(SerializationInfo info, StreamingContext context) + { + if (marshErr) + throw new Exception("Filter marshalling error."); + } + } + + /// <summary> + /// Filter for "keep-portable" scenario. + /// </summary> + public class KeepPortableFilter : AbstractFilter<IPortableObject> + { + // No-op. + } + + /// <summary> + /// Listener. + /// </summary> + public class Listener<V> : ICacheEntryEventListener<int, V> + { + [InstanceResource] + public IIgnite ignite; + + /** <inheritDoc /> */ + public void OnEvent(IEnumerable<ICacheEntryEvent<int, V>> evts) + { + ICollection<ICacheEntryEvent<object, object>> entries0 = + new List<ICacheEntryEvent<object, object>>(); + + foreach (ICacheEntryEvent<int, V> evt in evts) + entries0.Add(CQU.CreateEvent<object, object>(evt.Key, evt.OldValue, evt.Value)); + + CB_EVTS.Add(new CallbackEvent(entries0)); + } + } + + /// <summary> + /// Listener with nested Ignite API call. + /// </summary> + public class NestedCallListener : ICacheEntryEventListener<int, IPortableObject> + { + /** Event. */ + public readonly CountdownEvent countDown = new CountdownEvent(1); + + public void OnEvent(IEnumerable<ICacheEntryEvent<int, IPortableObject>> evts) + { + foreach (ICacheEntryEvent<int, IPortableObject> evt in evts) + { + IPortableObject val = evt.Value; + + IPortableMetadata meta = val.Metadata(); + + Assert.AreEqual(typeof(PortableEntry).Name, meta.TypeName); + } + + countDown.Signal(); + } + } + + /// <summary> + /// Filter event. + /// </summary> + public class FilterEvent + { + /** Grid. */ + public IIgnite ignite; + + /** Entry. */ + public ICacheEntryEvent<object, object> entry; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="ignite">Grid.</param> + /// <param name="entry">Entry.</param> + public FilterEvent(IIgnite ignite, ICacheEntryEvent<object, object> entry) + { + this.ignite = ignite; + this.entry = entry; + } + } + + /// <summary> + /// Callbakc event. + /// </summary> + public class CallbackEvent + { + /** Entries. */ + public ICollection<ICacheEntryEvent<object, object>> entries; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="entries">Entries.</param> + public CallbackEvent(ICollection<ICacheEntryEvent<object, object>> entries) + { + this.entries = entries; + } + } + + /// <summary> + /// ScanQuery filter for InitialQuery test. + /// </summary> + [Serializable] + private class InitialQueryScanFilter : ICacheEntryFilter<int, PortableEntry> + { + /** <inheritdoc /> */ + public bool Invoke(ICacheEntry<int, PortableEntry> entry) + { + return entry.Key < 33; + } + } + } +} +
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAtomicBackupTest.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAtomicBackupTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAtomicBackupTest.cs new file mode 100644 index 0000000..ac44f10 --- /dev/null +++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAtomicBackupTest.cs @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Tests.Cache.Query.Continuous +{ + /// <summary> + /// Continuous query tests for ATOMIC cache with backups. + /// </summary> + public class ContinuousQueryAtomiclBackupTest : ContinuousQueryAbstractTest + { + /// <summary> + /// Constructor. + /// </summary> + public ContinuousQueryAtomiclBackupTest() : base(CACHE_ATOMIC_BACKUP) + { + // No-op. + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAtomicNoBackupTest.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAtomicNoBackupTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAtomicNoBackupTest.cs new file mode 100644 index 0000000..8e1a18f --- /dev/null +++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAtomicNoBackupTest.cs @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Tests.Cache.Query.Continuous +{ + /// <summary> + /// Continuous query tests for ATOMIC cache with no backups. + /// </summary> + public class ContinuousQueryAtomiclNoBackupTest : ContinuousQueryNoBackupAbstractTest + { + /// <summary> + /// Constructor. + /// </summary> + public ContinuousQueryAtomiclNoBackupTest() + : base(CACHE_ATOMIC_NO_BACKUP) + { + // No-op. + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryNoBackupAbstractTest.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryNoBackupAbstractTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryNoBackupAbstractTest.cs new file mode 100644 index 0000000..aa7d627 --- /dev/null +++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryNoBackupAbstractTest.cs @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Tests.Cache.Query.Continuous +{ + using NUnit.Framework; + + /// <summary> + /// Tests for ocntinuous query when there are no backups. + /// </summary> + public abstract class ContinuousQueryNoBackupAbstractTest : ContinuousQueryAbstractTest + { + /// <summary> + /// Constructor. + /// </summary> + /// <param name="cacheName">Cache name.</param> + protected ContinuousQueryNoBackupAbstractTest(string cacheName) : base(cacheName) + { + // No-op. + } + + /// <summary> + /// Test regular callback operations for local query. + /// </summary> + [Test] + public void TestCallbackLocal() + { + CheckCallback(true); + } + + /// <summary> + /// Test portable filter logic. + /// </summary> + [Test] + public void TestFilterPortableLocal() + { + CheckFilter(true, true); + } + + /// <summary> + /// Test serializable filter logic. + /// </summary> + [Test] + public void TestFilterSerializableLocal() + { + CheckFilter(false, true); + } + + /// <summary> + /// Test non-serializable filter for local query. + /// </summary> + [Test] + public void TestFilterNonSerializableLocal() + { + CheckFilterNonSerializable(true); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryTransactionalBackupTest.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryTransactionalBackupTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryTransactionalBackupTest.cs new file mode 100644 index 0000000..08ae88c --- /dev/null +++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryTransactionalBackupTest.cs @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Tests.Cache.Query.Continuous +{ + /// <summary> + /// Continuous query tests for TRANSACTIONAL cache with backups. + /// </summary> + public class ContinuousQueryTransactionalBackupTest : ContinuousQueryAbstractTest + { + /// <summary> + /// Constructor. + /// </summary> + public ContinuousQueryTransactionalBackupTest() + : base(CACHE_TX_BACKUP) + { + // No-op. + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryTransactionalNoBackupTest.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryTransactionalNoBackupTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryTransactionalNoBackupTest.cs new file mode 100644 index 0000000..685f7b4 --- /dev/null +++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryTransactionalNoBackupTest.cs @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Tests.Cache.Query.Continuous +{ + /// <summary> + /// Continuous query tests for TRANSACTIONAL cache with no backups. + /// </summary> + public class ContinuousQueryTransactionalNoBackupTest : ContinuousQueryNoBackupAbstractTest + { + /// <summary> + /// Constructor. + /// </summary> + public ContinuousQueryTransactionalNoBackupTest() : base(CACHE_TX_NO_BACKUP) + { + // No-op. + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheParallelLoadStoreTest.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheParallelLoadStoreTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheParallelLoadStoreTest.cs new file mode 100644 index 0000000..33eec7b --- /dev/null +++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheParallelLoadStoreTest.cs @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Tests.Cache.Store +{ + using System; + using Apache.Ignite.Core.Cache; + using Apache.Ignite.Core.Portable; + using NUnit.Framework; + + /// <summary> + /// Tests for GridCacheParallelLoadStoreAdapter. + /// </summary> + public class CacheParallelLoadStoreTest + { + // object store name + private const string ObjectStoreCacheName = "object_store_parallel"; + + /// <summary> + /// Set up test class. + /// </summary> + [TestFixtureSetUp] + public virtual void BeforeTests() + { + TestUtils.KillProcesses(); + TestUtils.JvmDebug = true; + + Ignition.Start(new IgniteConfiguration + { + JvmClasspath = TestUtils.CreateTestClasspath(), + JvmOptions = TestUtils.TestJavaOptions(), + SpringConfigUrl = "config\\native-client-test-cache-parallel-store.xml", + PortableConfiguration = new PortableConfiguration + { + Types = new[] {typeof (CacheTestParallelLoadStore.Record).FullName} + } + }); + } + + /// <summary> + /// Tear down test class. + /// </summary> + [TestFixtureTearDown] + public virtual void AfterTests() + { + Ignition.StopAll(true); + } + + /// <summary> + /// Test setup. + /// </summary> + [SetUp] + public void BeforeTest() + { + Console.WriteLine("Test started: " + TestContext.CurrentContext.Test.Name); + } + + /// <summary> + /// Tests the LoadCache. + /// </summary> + [Test] + public void TestLoadCache() + { + var cache = GetCache(); + + Assert.AreEqual(0, cache.Size()); + + const int minId = 113; + const int expectedItemCount = CacheTestParallelLoadStore.InputDataLength - minId; + + CacheTestParallelLoadStore.ResetCounters(); + + cache.LocalLoadCache(null, minId); + + Assert.AreEqual(expectedItemCount, cache.Size()); + + // check items presence; increment by 100 to speed up the test + for (var i = minId; i < expectedItemCount; i += 100) + { + var rec = cache.Get(i); + Assert.AreEqual(i, rec.Id); + } + + // check that items were processed in parallel + Assert.GreaterOrEqual(CacheTestParallelLoadStore.UniqueThreadCount, Environment.ProcessorCount); + } + + /// <summary> + /// Gets the cache. + /// </summary> + private static ICache<int, CacheTestParallelLoadStore.Record> GetCache() + { + return Ignition.GetIgnite().Cache<int, CacheTestParallelLoadStore.Record>(ObjectStoreCacheName); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs new file mode 100644 index 0000000..bc55901 --- /dev/null +++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Tests.Cache.Store +{ + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.Linq; + using Apache.Ignite.Core.Cache.Store; + using Apache.Ignite.Core.Impl; + using Apache.Ignite.Core.Resource; + using NUnit.Framework; + + /// <summary> + /// Tests for store session. + /// </summary> + public class CacheStoreSessionTest + { + /** Grid name. */ + private const string IgniteName = "grid"; + + /** Cache 1 name. */ + private const string Cache1 = "cache1"; + + /** Cache 2 name. */ + private const string Cache2 = "cache2"; + + /** Operations. */ + private static ConcurrentBag<ICollection<Operation>> _dumps; + + /// <summary> + /// Set up routine. + /// </summary> + [TestFixtureSetUp] + public virtual void BeforeTests() + { + //TestUtils.JVM_DEBUG = true; + + TestUtils.KillProcesses(); + + TestUtils.JvmDebug = true; + + IgniteConfigurationEx cfg = new IgniteConfigurationEx + { + GridName = IgniteName, + JvmClasspath = TestUtils.CreateTestClasspath(), + JvmOptions = TestUtils.TestJavaOptions(), + SpringConfigUrl = @"config\cache\store\cache-store-session.xml" + }; + + + Ignition.Start(cfg); + } + + /// <summary> + /// Tear down routine. + /// </summary> + [TestFixtureTearDown] + public virtual void AfterTests() + { + Ignition.StopAll(true); + } + + /// <summary> + /// Test basic session API. + /// </summary> + [Test] + public void TestSession() + { + _dumps = new ConcurrentBag<ICollection<Operation>>(); + + var ignite = Ignition.GetIgnite(IgniteName); + + var cache1 = Ignition.GetIgnite(IgniteName).Cache<int, int>(Cache1); + var cache2 = Ignition.GetIgnite(IgniteName).Cache<int, int>(Cache2); + + // 1. Test rollback. + using (var tx = ignite.Transactions.TxStart()) + { + cache1.Put(1, 1); + cache2.Put(2, 2); + + tx.Rollback(); + } + + Assert.AreEqual(1, _dumps.Count); + var ops = _dumps.First(); + Assert.AreEqual(1, ops.Count); + + Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.SesEnd && !op.Commit)); + + _dumps = new ConcurrentBag<ICollection<Operation>>(); + + // 2. Test puts. + using (var tx = ignite.Transactions.TxStart()) + { + cache1.Put(1, 1); + cache2.Put(2, 2); + + tx.Commit(); + } + + Assert.AreEqual(1, _dumps.Count); + ops = _dumps.First(); + Assert.AreEqual(3, ops.Count); + + Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.Write && Cache1.Equals(op.CacheName) && 1.Equals(op.Key) && 1.Equals(op.Value))); + Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.Write && Cache2.Equals(op.CacheName) && 2.Equals(op.Key) && 2.Equals(op.Value))); + Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.SesEnd && op.Commit)); + + _dumps = new ConcurrentBag<ICollection<Operation>>(); + + // 3. Test removes. + using (var tx = ignite.Transactions.TxStart()) + { + cache1.Remove(1); + cache2.Remove(2); + + tx.Commit(); + } + + Assert.AreEqual(1, _dumps.Count); + ops = _dumps.First(); + Assert.AreEqual(3, ops.Count); + + Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.Delete && Cache1.Equals(op.CacheName) && 1.Equals(op.Key))); + Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.Delete && Cache2.Equals(op.CacheName) && 2.Equals(op.Key))); + Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.SesEnd && op.Commit)); + } + + /// <summary> + /// Dump operations. + /// </summary> + /// <param name="dump">Dump.</param> + internal static void DumpOperations(ICollection<Operation> dump) + { + _dumps.Add(dump); + } + + /// <summary> + /// Test store implementation. + /// </summary> + public class Store : CacheStoreAdapter + { + /** Store session. */ + [StoreSessionResource] +#pragma warning disable 649 + private ICacheStoreSession _ses; +#pragma warning restore 649 + + /** <inheritdoc /> */ + public override object Load(object key) + { + throw new NotImplementedException(); + } + + /** <inheritdoc /> */ + public override void Write(object key, object val) + { + GetOperations().Add(new Operation(_ses.CacheName, OperationType.Write, (int)key, (int)val)); + } + + /** <inheritdoc /> */ + public override void Delete(object key) + { + GetOperations().Add(new Operation(_ses.CacheName, OperationType.Delete, (int)key, 0)); + } + + /** <inheritdoc /> */ + public override void SessionEnd(bool commit) + { + Operation op = new Operation(_ses.CacheName, OperationType.SesEnd) { Commit = commit }; + + ICollection<Operation> ops = GetOperations(); + + ops.Add(op); + + DumpOperations(ops); + } + + /// <summary> + /// Get collection with operations. + /// </summary> + /// <returns>Operations.</returns> + private ICollection<Operation> GetOperations() + { + object ops; + + if (!_ses.Properties.TryGetValue("ops", out ops)) + { + ops = new List<Operation>(); + + _ses.Properties["ops"] = ops; + } + + return (ICollection<Operation>) ops; + } + } + + /// <summary> + /// Logged operation. + /// </summary> + internal class Operation + { + /// <summary> + /// Constructor. + /// </summary> + /// <param name="cacheName">Cache name.</param> + /// <param name="type">Operation type.</param> + public Operation(string cacheName, OperationType type) + { + CacheName = cacheName; + Type = type; + } + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="cacheName">Cache name.</param> + /// <param name="type">Operation type.</param> + /// <param name="key">Key.</param> + /// <param name="val">Value.</param> + public Operation(string cacheName, OperationType type, int key, int val) : this(cacheName, type) + { + Key = key; + Value = val; + } + + /// <summary> + /// Cache name. + /// </summary> + public string CacheName { get; set; } + + /// <summary> + /// Operation type. + /// </summary> + public OperationType Type { get; set; } + + /// <summary> + /// Key. + /// </summary> + public int Key { get; set; } + + /// <summary> + /// Value. + /// </summary> + public int Value { get; set; } + + /// <summary> + /// Commit flag. + /// </summary> + public bool Commit { get; set; } + } + + /// <summary> + /// Operation types. + /// </summary> + internal enum OperationType + { + /** Write. */ + Write, + + /** Delete. */ + Delete, + + /** Session end. */ + SesEnd + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs new file mode 100644 index 0000000..4e5e050 --- /dev/null +++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs @@ -0,0 +1,510 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Tests.Cache.Store +{ + using System; + using System.Collections; + using System.Collections.Generic; + using Apache.Ignite.Core.Cache; + using Apache.Ignite.Core.Impl; + using Apache.Ignite.Core.Portable; + using NUnit.Framework; + + /// <summary> + /// + /// </summary> + class Key + { + private readonly int _idx; + + public Key(int idx) + { + _idx = idx; + } + + public int Index() + { + return _idx; + } + + public override bool Equals(object obj) + { + if (obj == null || obj.GetType() != GetType()) + return false; + + Key key = (Key)obj; + + return key._idx == _idx; + } + + public override int GetHashCode() + { + return _idx; + } + } + + /// <summary> + /// + /// </summary> + class Value + { + private int _idx; + + public Value(int idx) + { + _idx = idx; + } + + public int Index() + { + return _idx; + } + } + + /// <summary> + /// Cache entry predicate. + /// </summary> + [Serializable] + public class CacheEntryFilter : ICacheEntryFilter<int, string> + { + /** <inheritdoc /> */ + public bool Invoke(ICacheEntry<int, string> entry) + { + return entry.Key >= 105; + } + } + + /// <summary> + /// + /// </summary> + public class CacheStoreTest + { + /** */ + private const string PortableStoreCacheName = "portable_store"; + + /** */ + private const string ObjectStoreCacheName = "object_store"; + + /** */ + private const string CustomStoreCacheName = "custom_store"; + + /** */ + private const string TemplateStoreCacheName = "template_store*"; + + /// <summary> + /// + /// </summary> + [TestFixtureSetUp] + public void BeforeTests() + { + //TestUtils.JVM_DEBUG = true; + + TestUtils.KillProcesses(); + + TestUtils.JvmDebug = true; + + IgniteConfigurationEx cfg = new IgniteConfigurationEx(); + + cfg.GridName = GridName(); + cfg.JvmClasspath = TestUtils.CreateTestClasspath(); + cfg.JvmOptions = TestUtils.TestJavaOptions(); + cfg.SpringConfigUrl = "config\\native-client-test-cache-store.xml"; + + PortableConfiguration portCfg = new PortableConfiguration(); + + portCfg.Types = new List<string> { typeof(Key).FullName, typeof(Value).FullName }; + + cfg.PortableConfiguration = portCfg; + + Ignition.Start(cfg); + } + + /// <summary> + /// + /// </summary> + [TestFixtureTearDown] + public virtual void AfterTests() + { + Ignition.StopAll(true); + } + + /// <summary> + /// + /// </summary> + [SetUp] + public void BeforeTest() + { + Console.WriteLine("Test started: " + TestContext.CurrentContext.Test.Name); + } + + /// <summary> + /// + /// </summary> + [TearDown] + public void AfterTest() + { + var cache = Cache(); + + cache.Clear(); + + Assert.IsTrue(cache.IsEmpty, "Cache is not empty: " + cache.Size()); + + CacheTestStore.Reset(); + + Console.WriteLine("Test finished: " + TestContext.CurrentContext.Test.Name); + } + + [Test] + public void TestLoadCache() + { + var cache = Cache(); + + Assert.AreEqual(0, cache.Size()); + + cache.LoadCache(new CacheEntryFilter(), 100, 10); + + Assert.AreEqual(5, cache.Size()); + + for (int i = 105; i < 110; i++) + Assert.AreEqual("val_" + i, cache.Get(i)); + } + + [Test] + public void TestLocalLoadCache() + { + var cache = Cache(); + + Assert.AreEqual(0, cache.Size()); + + cache.LocalLoadCache(new CacheEntryFilter(), 100, 10); + + Assert.AreEqual(5, cache.Size()); + + for (int i = 105; i < 110; i++) + Assert.AreEqual("val_" + i, cache.Get(i)); + } + + [Test] + public void TestLoadCacheMetadata() + { + CacheTestStore.LoadObjects = true; + + var cache = Cache(); + + Assert.AreEqual(0, cache.Size()); + + cache.LocalLoadCache(null, 0, 3); + + Assert.AreEqual(3, cache.Size()); + + var meta = cache.WithKeepPortable<Key, IPortableObject>().Get(new Key(0)).Metadata(); + + Assert.NotNull(meta); + + Assert.AreEqual("Value", meta.TypeName); + } + + [Test] + public void TestLoadCacheAsync() + { + var cache = Cache().WithAsync(); + + Assert.AreEqual(0, cache.Size()); + + cache.LocalLoadCache(new CacheEntryFilter(), 100, 10); + + var fut = cache.GetFuture<object>(); + + fut.Get(); + + Assert.IsTrue(fut.IsDone); + + cache.Size(); + Assert.AreEqual(5, cache.GetFuture<int>().ToTask().Result); + + for (int i = 105; i < 110; i++) + { + cache.Get(i); + + Assert.AreEqual("val_" + i, cache.GetFuture<string>().ToTask().Result); + } + } + + [Test] + public void TestPutLoad() + { + var cache = Cache(); + + cache.Put(1, "val"); + + IDictionary map = StoreMap(); + + Assert.AreEqual(1, map.Count); + + cache.LocalEvict(new[] { 1 }); + + Assert.AreEqual(0, cache.Size()); + + Assert.AreEqual("val", cache.Get(1)); + + Assert.AreEqual(1, cache.Size()); + } + + [Test] + public void TestPutLoadPortables() + { + var cache = PortableStoreCache<int, Value>(); + + cache.Put(1, new Value(1)); + + IDictionary map = StoreMap(); + + Assert.AreEqual(1, map.Count); + + IPortableObject v = (IPortableObject)map[1]; + + Assert.AreEqual(1, v.Field<int>("_idx")); + + cache.LocalEvict(new[] { 1 }); + + Assert.AreEqual(0, cache.Size()); + + Assert.AreEqual(1, cache.Get(1).Index()); + + Assert.AreEqual(1, cache.Size()); + } + + [Test] + public void TestPutLoadObjects() + { + var cache = ObjectStoreCache<int, Value>(); + + cache.Put(1, new Value(1)); + + IDictionary map = StoreMap(); + + Assert.AreEqual(1, map.Count); + + Value v = (Value)map[1]; + + Assert.AreEqual(1, v.Index()); + + cache.LocalEvict(new[] { 1 }); + + Assert.AreEqual(0, cache.Size()); + + Assert.AreEqual(1, cache.Get(1).Index()); + + Assert.AreEqual(1, cache.Size()); + } + + [Test] + public void TestPutLoadAll() + { + var putMap = new Dictionary<int, string>(); + + for (int i = 0; i < 10; i++) + putMap.Add(i, "val_" + i); + + var cache = Cache(); + + cache.PutAll(putMap); + + IDictionary map = StoreMap(); + + Assert.AreEqual(10, map.Count); + + for (int i = 0; i < 10; i++) + Assert.AreEqual("val_" + i, map[i]); + + cache.Clear(); + + Assert.AreEqual(0, cache.Size()); + + ICollection<int> keys = new List<int>(); + + for (int i = 0; i < 10; i++) + keys.Add(i); + + IDictionary<int, string> loaded = cache.GetAll(keys); + + Assert.AreEqual(10, loaded.Count); + + for (int i = 0; i < 10; i++) + Assert.AreEqual("val_" + i, loaded[i]); + + Assert.AreEqual(10, cache.Size()); + } + + [Test] + public void TestRemove() + { + var cache = Cache(); + + for (int i = 0; i < 10; i++) + cache.Put(i, "val_" + i); + + IDictionary map = StoreMap(); + + Assert.AreEqual(10, map.Count); + + for (int i = 0; i < 5; i++) + cache.Remove(i); + + Assert.AreEqual(5, map.Count); + + for (int i = 5; i < 10; i++) + Assert.AreEqual("val_" + i, map[i]); + } + + [Test] + public void TestRemoveAll() + { + var cache = Cache(); + + for (int i = 0; i < 10; i++) + cache.Put(i, "val_" + i); + + IDictionary map = StoreMap(); + + Assert.AreEqual(10, map.Count); + + cache.RemoveAll(new List<int> { 0, 1, 2, 3, 4 }); + + Assert.AreEqual(5, map.Count); + + for (int i = 5; i < 10; i++) + Assert.AreEqual("val_" + i, map[i]); + } + + [Test] + public void TestTx() + { + var cache = Cache(); + + using (var tx = cache.Ignite.Transactions.TxStart()) + { + CacheTestStore.ExpCommit = true; + + tx.AddMeta("meta", 100); + + cache.Put(1, "val"); + + tx.Commit(); + } + + IDictionary map = StoreMap(); + + Assert.AreEqual(1, map.Count); + + Assert.AreEqual("val", map[1]); + } + + [Test] + public void TestLoadCacheMultithreaded() + { + CacheTestStore.LoadMultithreaded = true; + + var cache = Cache(); + + Assert.AreEqual(0, cache.Size()); + + cache.LocalLoadCache(null, 0, null); + + Assert.AreEqual(1000, cache.Size()); + + for (int i = 0; i < 1000; i++) + Assert.AreEqual("val_" + i, cache.Get(i)); + } + + [Test] + public void TestCustomStoreProperties() + { + var cache = CustomStoreCache(); + Assert.IsNotNull(cache); + + Assert.AreEqual(42, CacheTestStore.intProperty); + Assert.AreEqual("String value", CacheTestStore.stringProperty); + } + + [Test] + public void TestDynamicStoreStart() + { + var cache = TemplateStoreCache(); + + Assert.IsNotNull(cache); + + cache.Put(1, cache.Name); + + Assert.AreEqual(cache.Name, CacheTestStore.Map[1]); + } + + /// <summary> + /// Get's grid name for this test. + /// </summary> + /// <returns>Grid name.</returns> + protected virtual string GridName() + { + return null; + } + + private IDictionary StoreMap() + { + return CacheTestStore.Map; + } + + private ICache<int, string> Cache() + { + return PortableStoreCache<int, string>(); + } + + private ICache<TK, TV> PortableStoreCache<TK, TV>() + { + return Ignition.GetIgnite(GridName()).Cache<TK, TV>(PortableStoreCacheName); + } + + private ICache<TK, TV> ObjectStoreCache<TK, TV>() + { + return Ignition.GetIgnite(GridName()).Cache<TK, TV>(ObjectStoreCacheName); + } + + private ICache<int, string> CustomStoreCache() + { + return Ignition.GetIgnite(GridName()).Cache<int, string>(CustomStoreCacheName); + } + + private ICache<int, string> TemplateStoreCache() + { + var cacheName = TemplateStoreCacheName.Replace("*", Guid.NewGuid().ToString()); + + return Ignition.GetIgnite(GridName()).GetOrCreateCache<int, string>(cacheName); + } + } + + /// <summary> + /// + /// </summary> + public class NamedNodeCacheStoreTest : CacheStoreTest + { + /** <inheritDoc /> */ + protected override string GridName() + { + return "name"; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestParallelLoadStore.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestParallelLoadStore.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestParallelLoadStore.cs new file mode 100644 index 0000000..770ca83 --- /dev/null +++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestParallelLoadStore.cs @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Tests.Cache.Store +{ + using System.Collections; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.Linq; + using System.Threading; + using Apache.Ignite.Core.Cache.Store; + + /// <summary> + /// Test cache store with parallel load. + /// </summary> + public class CacheTestParallelLoadStore : CacheParallelLoadStoreAdapter + { + /** Length of input data sequence */ + public const int InputDataLength = 10000; + + /** list of thread ids where Parse has been executed */ + private static readonly ConcurrentDictionary<int, int> ThreadIds = new ConcurrentDictionary<int, int>(); + + /// <summary> + /// Gets the count of unique threads that entered Parse method. + /// </summary> + public static int UniqueThreadCount + { + get { return ThreadIds.Count; } + } + + /// <summary> + /// Resets the test counters. + /// </summary> + public static void ResetCounters() + { + ThreadIds.Clear(); + } + + /** <inheritdoc /> */ + protected override IEnumerable GetInputData() + { + return Enumerable.Range(0, InputDataLength).Select(x => new Record {Id = x, Name = "Test Record " + x}); + } + + /** <inheritdoc /> */ + protected override KeyValuePair<object, object>? Parse(object inputRecord, params object[] args) + { + var threadId = Thread.CurrentThread.ManagedThreadId; + ThreadIds.GetOrAdd(threadId, threadId); + + var minId = (int)args[0]; + + var rec = (Record)inputRecord; + + return rec.Id >= minId + ? new KeyValuePair<object, object>(rec.Id, rec) + : (KeyValuePair<object, object>?) null; + } + + /// <summary> + /// Test store record. + /// </summary> + public class Record + { + /// <summary> + /// Gets or sets the identifier. + /// </summary> + public int Id { get; set; } + + /// <summary> + /// Gets or sets the name. + /// </summary> + public string Name { get; set; } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs new file mode 100644 index 0000000..9c381cb --- /dev/null +++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Tests.Cache.Store +{ + using System; + using System.Collections; + using System.Collections.Concurrent; + using System.Diagnostics; + using System.Diagnostics.CodeAnalysis; + using System.Linq; + using System.Threading; + using Apache.Ignite.Core.Cache.Store; + using Apache.Ignite.Core.Resource; + + [SuppressMessage("ReSharper", "FieldCanBeMadeReadOnly.Local")] + public class CacheTestStore : ICacheStore + { + public static readonly IDictionary Map = new ConcurrentDictionary<object, object>(); + + public static bool ExpCommit; + + public static bool LoadMultithreaded; + + public static bool LoadObjects; + + [InstanceResource] + private IIgnite _grid = null; + + [StoreSessionResource] +#pragma warning disable 649 + private ICacheStoreSession _ses; +#pragma warning restore 649 + + public static int intProperty; + + public static string stringProperty; + + public static void Reset() + { + Map.Clear(); + + ExpCommit = false; + LoadMultithreaded = false; + LoadObjects = false; + } + + public void LoadCache(Action<object, object> act, params object[] args) + { + Debug.Assert(_grid != null); + + if (LoadMultithreaded) + { + int cnt = 0; + + TestUtils.RunMultiThreaded(() => { + int i; + + while ((i = Interlocked.Increment(ref cnt) - 1) < 1000) + act(i, "val_" + i); + }, 8); + } + else + { + int start = (int)args[0]; + int cnt = (int)args[1]; + + for (int i = start; i < start + cnt; i++) + { + if (LoadObjects) + act(new Key(i), new Value(i)); + else + act(i, "val_" + i); + } + } + } + + public object Load(object key) + { + Debug.Assert(_grid != null); + + return Map[key]; + } + + public IDictionary LoadAll(ICollection keys) + { + Debug.Assert(_grid != null); + + return keys.OfType<object>().ToDictionary(key => key, Load); + } + + public void Write(object key, object val) + { + Debug.Assert(_grid != null); + + Map[key] = val; + } + + public void WriteAll(IDictionary map) + { + Debug.Assert(_grid != null); + + foreach (DictionaryEntry e in map) + Map[e.Key] = e.Value; + } + + public void Delete(object key) + { + Debug.Assert(_grid != null); + + Map.Remove(key); + } + + public void DeleteAll(ICollection keys) + { + Debug.Assert(_grid != null); + + foreach (object key in keys) + Map.Remove(key); + } + + public void SessionEnd(bool commit) + { + Debug.Assert(_grid != null); + + Debug.Assert(_ses != null); + } + + public int IntProperty + { + get { return intProperty; } + set { intProperty = value; } + } + + public string StringProperty + { + get { return stringProperty; } + set { stringProperty = value; } + } + } +}
