http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/CacheEnumerator.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/CacheEnumerator.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/CacheEnumerator.cs new file mode 100644 index 0000000..fd26558 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/CacheEnumerator.cs @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache +{ + using System; + using System.Collections; + using System.Collections.Generic; + using Apache.Ignite.Core.Cache; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Portable.IO; + using Apache.Ignite.Core.Impl.Unmanaged; + + /// <summary> + /// Real cache enumerator communicating with Java. + /// </summary> + internal class CacheEnumerator<TK, TV> : PlatformDisposableTarget, IEnumerator<ICacheEntry<TK, TV>> + { + /** Operation: next value. */ + private const int OpNext = 1; + + /** Keep portable flag. */ + private readonly bool _keepPortable; + + /** Current entry. */ + private CacheEntry<TK, TV>? _cur; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="target">Target.</param> + /// <param name="marsh">Marshaller.</param> + /// <param name="keepPortable">Keep portable flag.</param> + public CacheEnumerator(IUnmanagedTarget target, PortableMarshaller marsh, bool keepPortable) : + base(target, marsh) + { + _keepPortable = keepPortable; + } + + /** <inheritdoc /> */ + public bool MoveNext() + { + ThrowIfDisposed(); + + return DoInOp(OpNext, stream => + { + var reader = Marshaller.StartUnmarshal(stream, _keepPortable); + + bool hasNext = reader.ReadBoolean(); + + if (hasNext) + { + reader.DetachNext(); + TK key = reader.ReadObject<TK>(); + + reader.DetachNext(); + TV val = reader.ReadObject<TV>(); + + _cur = new CacheEntry<TK, TV>(key, val); + + return true; + } + + _cur = null; + + return false; + }); + } + + /** <inheritdoc /> */ + public ICacheEntry<TK, TV> Current + { + get + { + ThrowIfDisposed(); + + if (_cur == null) + throw new InvalidOperationException( + "Invalid enumerator state, enumeration is either finished or not started"); + + return _cur.Value; + } + } + + /** <inheritdoc /> */ + object IEnumerator.Current + { + get { return Current; } + } + + /** <inheritdoc /> */ + public void Reset() + { + throw new NotSupportedException("Specified method is not supported."); + } + + /** <inheritdoc /> */ + protected override T Unmarshal<T>(IPortableStream stream) + { + throw new InvalidOperationException("Should not be called."); + } + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/CacheEnumeratorProxy.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/CacheEnumeratorProxy.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/CacheEnumeratorProxy.cs new file mode 100644 index 0000000..cadc58d --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/CacheEnumeratorProxy.cs @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache +{ + using System; + using System.Collections; + using System.Collections.Generic; + using System.Diagnostics; + using Apache.Ignite.Core.Cache; + + /// <summary> + /// Cache enumerator proxy. Required to support reset and early native iterator cleanup. + /// </summary> + internal class CacheEnumeratorProxy<TK, TV> : IEnumerator<ICacheEntry<TK, TV>> + { + /** Target cache. */ + private readonly CacheImpl<TK, TV> _cache; + + /** Local flag. */ + private readonly bool _loc; + + /** Peek modes. */ + private readonly int _peekModes; + + /** Target enumerator. */ + private CacheEnumerator<TK, TV> _target; + + /** Dispose flag. */ + private bool _disposed; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="cache">Target cache.</param> + /// <param name="loc">Local flag.</param> + /// <param name="peekModes">Peek modes.</param> + public CacheEnumeratorProxy(CacheImpl<TK, TV> cache, bool loc, int peekModes) + { + _cache = cache; + _loc = loc; + _peekModes = peekModes; + + CreateTarget(); + } + + /** <inheritdoc /> */ + public bool MoveNext() + { + CheckDisposed(); + + // No target => closed or finished. + if (_target == null) + return false; + + if (!_target.MoveNext()) + { + // Failed to advance => end is reached. + CloseTarget(); + + return false; + } + + return true; + } + + /** <inheritdoc /> */ + public ICacheEntry<TK, TV> Current + { + get + { + CheckDisposed(); + + if (_target == null) + throw new InvalidOperationException("Invalid enumerator state (did you call MoveNext()?)"); + + return _target.Current; + } + } + + /** <inheritdoc /> */ + object IEnumerator.Current + { + get { return Current; } + } + + /** <inheritdoc /> */ + public void Reset() + { + CheckDisposed(); + + if (_target != null) + CloseTarget(); + + CreateTarget(); + } + + /** <inheritdoc /> */ + public void Dispose() + { + if (!_disposed) + { + if (_target != null) + CloseTarget(); + + _disposed = true; + } + } + + /// <summary> + /// Get target enumerator. + /// </summary> + /// <returns>Target enumerator.</returns> + private void CreateTarget() + { + Debug.Assert(_target == null, "Previous target is not cleaned."); + + _target = _cache.CreateEnumerator(_loc, _peekModes); + } + + /// <summary> + /// Close the target. + /// </summary> + private void CloseTarget() + { + Debug.Assert(_target != null); + + _target.Dispose(); + + _target = null; + } + + /// <summary> + /// Check whether object is disposed. + /// </summary> + private void CheckDisposed() + { + if (_disposed) + throw new ObjectDisposedException("Cache enumerator has been disposed."); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs new file mode 100644 index 0000000..0301352 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs @@ -0,0 +1,932 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache +{ + using System; + using System.Collections; + using System.Collections.Generic; + using System.Diagnostics.CodeAnalysis; + using System.Threading; + using Apache.Ignite.Core.Cache; + using Apache.Ignite.Core.Cache.Expiry; + using Apache.Ignite.Core.Cache.Query; + using Apache.Ignite.Core.Cache.Query.Continuous; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Impl.Cache.Query; + using Apache.Ignite.Core.Impl.Cache.Query.Continuous; + using Apache.Ignite.Core.Impl.Common; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Portable.IO; + using Apache.Ignite.Core.Impl.Unmanaged; + using Apache.Ignite.Core.Portable; + using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; + + /// <summary> + /// Native cache wrapper. + /// </summary> + [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")] + internal class CacheImpl<TK, TV> : PlatformTarget, ICache<TK, TV> + { + /** Duration: unchanged. */ + private const long DurUnchanged = -2; + + /** Duration: eternal. */ + private const long DurEternal = -1; + + /** Duration: zero. */ + private const long DurZero = 0; + + /** Ignite instance. */ + private readonly Ignite _ignite; + + /** Flag: skip store. */ + private readonly bool _flagSkipStore; + + /** Flag: keep portable. */ + private readonly bool _flagKeepPortable; + + /** Flag: async mode.*/ + private readonly bool _flagAsync; + + /** Flag: no-retries.*/ + private readonly bool _flagNoRetries; + + /** + * Result converter for async InvokeAll operation. + * In future result processing there is only one TResult generic argument, + * and we can't get the type of ICacheEntryProcessorResult at compile time from it. + * This field caches converter for the last InvokeAll operation to avoid using reflection. + */ + private readonly ThreadLocal<object> _invokeAllConverter = new ThreadLocal<object>(); + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="grid">Grid.</param> + /// <param name="target">Target.</param> + /// <param name="marsh">Marshaller.</param> + /// <param name="flagSkipStore">Skip store flag.</param> + /// <param name="flagKeepPortable">Keep portable flag.</param> + /// <param name="flagAsync">Async mode flag.</param> + /// <param name="flagNoRetries">No-retries mode flag.</param> + public CacheImpl(Ignite grid, IUnmanagedTarget target, PortableMarshaller marsh, + bool flagSkipStore, bool flagKeepPortable, bool flagAsync, bool flagNoRetries) : base(target, marsh) + { + _ignite = grid; + _flagSkipStore = flagSkipStore; + _flagKeepPortable = flagKeepPortable; + _flagAsync = flagAsync; + _flagNoRetries = flagNoRetries; + } + + /** <inheritDoc /> */ + public IIgnite Ignite + { + get + { + return _ignite; + } + } + + /** <inheritDoc /> */ + public bool IsAsync + { + get { return _flagAsync; } + } + + /** <inheritDoc /> */ + public IFuture GetFuture() + { + throw new NotSupportedException("GetFuture() should be called through CacheProxyImpl"); + } + + /** <inheritDoc /> */ + public IFuture<TResult> GetFuture<TResult>() + { + throw new NotSupportedException("GetFuture() should be called through CacheProxyImpl"); + } + + /// <summary> + /// Gets and resets future for previous asynchronous operation. + /// </summary> + /// <param name="lastAsyncOpId">The last async op id.</param> + /// <returns> + /// Future for previous asynchronous operation. + /// </returns> + /// <exception cref="System.InvalidOperationException">Asynchronous mode is disabled</exception> + internal IFuture<TResult> GetFuture<TResult>(int lastAsyncOpId) + { + if (!_flagAsync) + throw IgniteUtils.GetAsyncModeDisabledException(); + + var converter = GetFutureResultConverter<TResult>(lastAsyncOpId); + + _invokeAllConverter.Value = null; + + return GetFuture((futId, futTypeId) => UU.TargetListenFutureForOperation(Target, futId, futTypeId, lastAsyncOpId), + _flagKeepPortable, converter); + } + + /** <inheritDoc /> */ + public string Name + { + get { return DoInOp<string>((int)CacheOp.GetName); } + } + + /** <inheritDoc /> */ + public bool IsEmpty + { + get { return Size() == 0; } + } + + /** <inheritDoc /> */ + public ICache<TK, TV> WithSkipStore() + { + if (_flagSkipStore) + return this; + + return new CacheImpl<TK, TV>(_ignite, UU.CacheWithSkipStore(Target), Marshaller, + true, _flagKeepPortable, _flagAsync, true); + } + + /// <summary> + /// Skip store flag getter. + /// </summary> + internal bool IsSkipStore { get { return _flagSkipStore; } } + + /** <inheritDoc /> */ + public ICache<TK1, TV1> WithKeepPortable<TK1, TV1>() + { + if (_flagKeepPortable) + { + var result = this as ICache<TK1, TV1>; + + if (result == null) + throw new InvalidOperationException( + "Can't change type of portable cache. WithKeepPortable has been called on an instance of " + + "portable cache with incompatible generic arguments."); + + return result; + } + + return new CacheImpl<TK1, TV1>(_ignite, UU.CacheWithKeepPortable(Target), Marshaller, + _flagSkipStore, true, _flagAsync, _flagNoRetries); + } + + /** <inheritDoc /> */ + public ICache<TK, TV> WithExpiryPolicy(IExpiryPolicy plc) + { + IgniteArgumentCheck.NotNull(plc, "plc"); + + long create = ConvertDuration(plc.GetExpiryForCreate()); + long update = ConvertDuration(plc.GetExpiryForUpdate()); + long access = ConvertDuration(plc.GetExpiryForAccess()); + + IUnmanagedTarget cache0 = UU.CacheWithExpiryPolicy(Target, create, update, access); + + return new CacheImpl<TK, TV>(_ignite, cache0, Marshaller, _flagSkipStore, _flagKeepPortable, _flagAsync, _flagNoRetries); + } + + /// <summary> + /// Convert TimeSpan to duration recognizable by Java. + /// </summary> + /// <param name="dur">.Net duration.</param> + /// <returns>Java duration in milliseconds.</returns> + private static long ConvertDuration(TimeSpan? dur) + { + if (dur.HasValue) + { + if (dur.Value == TimeSpan.MaxValue) + return DurEternal; + + long dur0 = (long)dur.Value.TotalMilliseconds; + + return dur0 > 0 ? dur0 : DurZero; + } + + return DurUnchanged; + } + + /** <inheritDoc /> */ + public ICache<TK, TV> WithAsync() + { + return _flagAsync ? this : new CacheImpl<TK, TV>(_ignite, UU.CacheWithAsync(Target), Marshaller, + _flagSkipStore, _flagKeepPortable, true, _flagNoRetries); + } + + /** <inheritDoc /> */ + public bool KeepPortable + { + get { return _flagKeepPortable; } + } + + /** <inheritDoc /> */ + public void LoadCache(ICacheEntryFilter<TK, TV> p, params object[] args) + { + LoadCache0(p, args, (int)CacheOp.LoadCache); + } + + /** <inheritDoc /> */ + public void LocalLoadCache(ICacheEntryFilter<TK, TV> p, params object[] args) + { + LoadCache0(p, args, (int)CacheOp.LocLoadCache); + } + + /// <summary> + /// Loads the cache. + /// </summary> + private void LoadCache0(ICacheEntryFilter<TK, TV> p, object[] args, int opId) + { + DoOutOp(opId, writer => + { + if (p != null) + { + var p0 = new CacheEntryFilterHolder(p, (k, v) => p.Invoke(new CacheEntry<TK, TV>((TK)k, (TV)v)), + Marshaller, KeepPortable); + writer.WriteObject(p0); + writer.WriteLong(p0.Handle); + } + else + writer.WriteObject<CacheEntryFilterHolder>(null); + + writer.WriteObjectArray(args); + }); + } + + /** <inheritDoc /> */ + public bool ContainsKey(TK key) + { + IgniteArgumentCheck.NotNull(key, "key"); + + return DoOutOp((int)CacheOp.ContainsKey, key) == True; + } + + /** <inheritDoc /> */ + public bool ContainsKeys(IEnumerable<TK> keys) + { + IgniteArgumentCheck.NotNull(keys, "keys"); + + return DoOutOp((int)CacheOp.ContainsKeys, writer => WriteEnumerable(writer, keys)) == True; + } + + /** <inheritDoc /> */ + public TV LocalPeek(TK key, params CachePeekMode[] modes) + { + IgniteArgumentCheck.NotNull(key, "key"); + + return DoOutInOp<TV>((int)CacheOp.Peek, writer => + { + writer.Write(key); + writer.WriteInt(EncodePeekModes(modes)); + }); + } + + /** <inheritDoc /> */ + public TV Get(TK key) + { + IgniteArgumentCheck.NotNull(key, "key"); + + return DoOutInOp<TK, TV>((int)CacheOp.Get, key); + } + + /** <inheritDoc /> */ + public IDictionary<TK, TV> GetAll(IEnumerable<TK> keys) + { + IgniteArgumentCheck.NotNull(keys, "keys"); + + return DoOutInOp((int)CacheOp.GetAll, + writer => WriteEnumerable(writer, keys), + input => + { + var reader = Marshaller.StartUnmarshal(input, _flagKeepPortable); + + return ReadGetAllDictionary(reader); + }); + } + + /** <inheritdoc /> */ + public void Put(TK key, TV val) + { + IgniteArgumentCheck.NotNull(key, "key"); + + IgniteArgumentCheck.NotNull(val, "val"); + + DoOutOp((int)CacheOp.Put, key, val); + } + + /** <inheritDoc /> */ + public TV GetAndPut(TK key, TV val) + { + IgniteArgumentCheck.NotNull(key, "key"); + + IgniteArgumentCheck.NotNull(val, "val"); + + return DoOutInOp<TK, TV, TV>((int)CacheOp.GetAndPut, key, val); + } + + /** <inheritDoc /> */ + public TV GetAndReplace(TK key, TV val) + { + IgniteArgumentCheck.NotNull(key, "key"); + + IgniteArgumentCheck.NotNull(val, "val"); + + return DoOutInOp<TK, TV, TV>((int)CacheOp.GetAndReplace, key, val); + } + + /** <inheritDoc /> */ + public TV GetAndRemove(TK key) + { + IgniteArgumentCheck.NotNull(key, "key"); + + return DoOutInOp<TK, TV>((int)CacheOp.GetAndRemove, key); + } + + /** <inheritdoc /> */ + public bool PutIfAbsent(TK key, TV val) + { + IgniteArgumentCheck.NotNull(key, "key"); + + IgniteArgumentCheck.NotNull(val, "val"); + + return DoOutOp((int) CacheOp.PutIfAbsent, key, val) == True; + } + + /** <inheritdoc /> */ + public TV GetAndPutIfAbsent(TK key, TV val) + { + IgniteArgumentCheck.NotNull(key, "key"); + + IgniteArgumentCheck.NotNull(val, "val"); + + return DoOutInOp<TK, TV, TV>((int)CacheOp.GetAndPutIfAbsent, key, val); + } + + /** <inheritdoc /> */ + public bool Replace(TK key, TV val) + { + IgniteArgumentCheck.NotNull(key, "key"); + + IgniteArgumentCheck.NotNull(val, "val"); + + return DoOutOp((int)CacheOp.Replace2, key, val) == True; + } + + /** <inheritdoc /> */ + public bool Replace(TK key, TV oldVal, TV newVal) + { + IgniteArgumentCheck.NotNull(key, "key"); + + IgniteArgumentCheck.NotNull(oldVal, "oldVal"); + + IgniteArgumentCheck.NotNull(newVal, "newVal"); + + return DoOutOp((int)CacheOp.Replace3, key, oldVal, newVal) == True; + } + + /** <inheritdoc /> */ + public void PutAll(IDictionary<TK, TV> vals) + { + IgniteArgumentCheck.NotNull(vals, "vals"); + + DoOutOp((int) CacheOp.PutAll, writer => WriteDictionary(writer, vals)); + } + + /** <inheritdoc /> */ + public void LocalEvict(IEnumerable<TK> keys) + { + IgniteArgumentCheck.NotNull(keys, "keys"); + + DoOutOp((int) CacheOp.LocEvict, writer => WriteEnumerable(writer, keys)); + } + + /** <inheritdoc /> */ + public void Clear() + { + UU.CacheClear(Target); + } + + /** <inheritdoc /> */ + public void Clear(TK key) + { + IgniteArgumentCheck.NotNull(key, "key"); + + DoOutOp((int)CacheOp.Clear, key); + } + + /** <inheritdoc /> */ + public void ClearAll(IEnumerable<TK> keys) + { + IgniteArgumentCheck.NotNull(keys, "keys"); + + DoOutOp((int)CacheOp.ClearAll, writer => WriteEnumerable(writer, keys)); + } + + /** <inheritdoc /> */ + public void LocalClear(TK key) + { + IgniteArgumentCheck.NotNull(key, "key"); + + DoOutOp((int) CacheOp.LocalClear, key); + } + + /** <inheritdoc /> */ + public void LocalClearAll(IEnumerable<TK> keys) + { + IgniteArgumentCheck.NotNull(keys, "keys"); + + DoOutOp((int)CacheOp.LocalClearAll, writer => WriteEnumerable(writer, keys)); + } + + /** <inheritdoc /> */ + public bool Remove(TK key) + { + IgniteArgumentCheck.NotNull(key, "key"); + + return DoOutOp((int)CacheOp.RemoveObj, key) == True; + } + + /** <inheritDoc /> */ + public bool Remove(TK key, TV val) + { + IgniteArgumentCheck.NotNull(key, "key"); + + IgniteArgumentCheck.NotNull(val, "val"); + + return DoOutOp((int)CacheOp.RemoveBool, key, val) == True; + } + + /** <inheritDoc /> */ + public void RemoveAll(IEnumerable<TK> keys) + { + IgniteArgumentCheck.NotNull(keys, "keys"); + + DoOutOp((int)CacheOp.RemoveAll, writer => WriteEnumerable(writer, keys)); + } + + /** <inheritDoc /> */ + public void RemoveAll() + { + UU.CacheRemoveAll(Target); + } + + /** <inheritDoc /> */ + public int LocalSize(params CachePeekMode[] modes) + { + return Size0(true, modes); + } + + /** <inheritDoc /> */ + public int Size(params CachePeekMode[] modes) + { + return Size0(false, modes); + } + + /// <summary> + /// Internal size routine. + /// </summary> + /// <param name="loc">Local flag.</param> + /// <param name="modes">peek modes</param> + /// <returns>Size.</returns> + private int Size0(bool loc, params CachePeekMode[] modes) + { + int modes0 = EncodePeekModes(modes); + + return UU.CacheSize(Target, modes0, loc); + } + + /** <inheritDoc /> */ + public void LocalPromote(IEnumerable<TK> keys) + { + IgniteArgumentCheck.NotNull(keys, "keys"); + + DoOutOp((int)CacheOp.LocPromote, writer => WriteEnumerable(writer, keys)); + } + + /** <inheritdoc /> */ + public TR Invoke<TR, TA>(TK key, ICacheEntryProcessor<TK, TV, TA, TR> processor, TA arg) + { + IgniteArgumentCheck.NotNull(key, "key"); + + IgniteArgumentCheck.NotNull(processor, "processor"); + + var holder = new CacheEntryProcessorHolder(processor, arg, + (e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TA)a), typeof(TK), typeof(TV)); + + return DoOutInOp((int)CacheOp.Invoke, writer => + { + writer.Write(key); + writer.Write(holder); + }, + input => GetResultOrThrow<TR>(Unmarshal<object>(input))); + } + + /** <inheritdoc /> */ + public IDictionary<TK, ICacheEntryProcessorResult<TR>> InvokeAll<TR, TA>(IEnumerable<TK> keys, + ICacheEntryProcessor<TK, TV, TA, TR> processor, TA arg) + { + IgniteArgumentCheck.NotNull(keys, "keys"); + + IgniteArgumentCheck.NotNull(processor, "processor"); + + var holder = new CacheEntryProcessorHolder(processor, arg, + (e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TA)a), typeof(TK), typeof(TV)); + + return DoOutInOp((int)CacheOp.InvokeAll, writer => + { + WriteEnumerable(writer, keys); + writer.Write(holder); + }, + input => + { + if (IsAsync) + _invokeAllConverter.Value = (Func<PortableReaderImpl, IDictionary<TK, ICacheEntryProcessorResult<TR>>>) + (reader => ReadInvokeAllResults<TR>(reader.Stream)); + + return ReadInvokeAllResults<TR>(input); + }); + } + + /** <inheritdoc /> */ + public ICacheLock Lock(TK key) + { + IgniteArgumentCheck.NotNull(key, "key"); + + return DoOutInOp((int)CacheOp.Lock, writer => + { + writer.Write(key); + }, input => new CacheLock(input.ReadInt(), Target)); + } + + /** <inheritdoc /> */ + public ICacheLock LockAll(IEnumerable<TK> keys) + { + IgniteArgumentCheck.NotNull(keys, "keys"); + + return DoOutInOp((int)CacheOp.LockAll, writer => + { + WriteEnumerable(writer, keys); + }, input => new CacheLock(input.ReadInt(), Target)); + } + + /** <inheritdoc /> */ + public bool IsLocalLocked(TK key, bool byCurrentThread) + { + IgniteArgumentCheck.NotNull(key, "key"); + + return DoOutOp((int)CacheOp.IsLocalLocked, writer => + { + writer.Write(key); + writer.WriteBoolean(byCurrentThread); + }) == True; + } + + /** <inheritDoc /> */ + public ICacheMetrics GetMetrics() + { + return DoInOp((int)CacheOp.Metrics, stream => + { + IPortableRawReader reader = Marshaller.StartUnmarshal(stream, false); + + return new CacheMetricsImpl(reader); + }); + } + + /** <inheritDoc /> */ + public IFuture Rebalance() + { + return GetFuture<object>((futId, futTyp) => UU.CacheRebalance(Target, futId)); + } + + /** <inheritDoc /> */ + public ICache<TK, TV> WithNoRetries() + { + if (_flagNoRetries) + return this; + + return new CacheImpl<TK, TV>(_ignite, UU.CacheWithNoRetries(Target), Marshaller, + _flagSkipStore, _flagKeepPortable, _flagAsync, true); + } + + /// <summary> + /// Gets a value indicating whether this instance is in no-retries mode. + /// </summary> + internal bool IsNoRetries + { + get { return _flagNoRetries; } + } + + #region Queries + + /** <inheritDoc /> */ + public IQueryCursor<IList> QueryFields(SqlFieldsQuery qry) + { + IgniteArgumentCheck.NotNull(qry, "qry"); + + if (string.IsNullOrEmpty(qry.Sql)) + throw new ArgumentException("Sql cannot be null or empty"); + + IUnmanagedTarget cursor; + + using (var stream = IgniteManager.Memory.Allocate().Stream()) + { + var writer = Marshaller.StartMarshal(stream); + + writer.WriteBoolean(qry.Local); + writer.WriteString(qry.Sql); + writer.WriteInt(qry.PageSize); + + WriteQueryArgs(writer, qry.Arguments); + + FinishMarshal(writer); + + cursor = UU.CacheOutOpQueryCursor(Target, (int) CacheOp.QrySqlFields, stream.SynchronizeOutput()); + } + + return new FieldsQueryCursor(cursor, Marshaller, _flagKeepPortable); + } + + /** <inheritDoc /> */ + public IQueryCursor<ICacheEntry<TK, TV>> Query(QueryBase qry) + { + IgniteArgumentCheck.NotNull(qry, "qry"); + + IUnmanagedTarget cursor; + + using (var stream = IgniteManager.Memory.Allocate().Stream()) + { + var writer = Marshaller.StartMarshal(stream); + + qry.Write(writer, KeepPortable); + + FinishMarshal(writer); + + cursor = UU.CacheOutOpQueryCursor(Target, (int)qry.OpId, stream.SynchronizeOutput()); + } + + return new QueryCursor<TK, TV>(cursor, Marshaller, _flagKeepPortable); + } + + /// <summary> + /// Write query arguments. + /// </summary> + /// <param name="writer">Writer.</param> + /// <param name="args">Arguments.</param> + private static void WriteQueryArgs(PortableWriterImpl writer, object[] args) + { + if (args == null) + writer.WriteInt(0); + else + { + writer.WriteInt(args.Length); + + foreach (var arg in args) + writer.WriteObject(arg); + } + } + + /** <inheritdoc /> */ + public IContinuousQueryHandle QueryContinuous(ContinuousQuery<TK, TV> qry) + { + IgniteArgumentCheck.NotNull(qry, "qry"); + + return QueryContinuousImpl(qry, null); + } + + /** <inheritdoc /> */ + public IContinuousQueryHandle<ICacheEntry<TK, TV>> QueryContinuous(ContinuousQuery<TK, TV> qry, QueryBase initialQry) + { + IgniteArgumentCheck.NotNull(qry, "qry"); + IgniteArgumentCheck.NotNull(initialQry, "initialQry"); + + return QueryContinuousImpl(qry, initialQry); + } + + /// <summary> + /// QueryContinuous implementation. + /// </summary> + private IContinuousQueryHandle<ICacheEntry<TK, TV>> QueryContinuousImpl(ContinuousQuery<TK, TV> qry, + QueryBase initialQry) + { + qry.Validate(); + + var hnd = new ContinuousQueryHandleImpl<TK, TV>(qry, Marshaller, _flagKeepPortable); + + using (var stream = IgniteManager.Memory.Allocate().Stream()) + { + var writer = Marshaller.StartMarshal(stream); + + hnd.Start(_ignite, writer, () => + { + if (initialQry != null) + { + writer.WriteInt((int) initialQry.OpId); + + initialQry.Write(writer, KeepPortable); + } + else + writer.WriteInt(-1); // no initial query + + FinishMarshal(writer); + + // ReSharper disable once AccessToDisposedClosure + return UU.CacheOutOpContinuousQuery(Target, (int)CacheOp.QryContinuous, stream.SynchronizeOutput()); + }, qry); + } + + return hnd; + } + + #endregion + + #region Enumerable support + + /** <inheritdoc /> */ + public IEnumerable<ICacheEntry<TK, TV>> GetLocalEntries(CachePeekMode[] peekModes) + { + return new CacheEnumerable<TK, TV>(this, EncodePeekModes(peekModes)); + } + + /** <inheritdoc /> */ + public IEnumerator<ICacheEntry<TK, TV>> GetEnumerator() + { + return new CacheEnumeratorProxy<TK, TV>(this, false, 0); + } + + /** <inheritdoc /> */ + IEnumerator IEnumerable.GetEnumerator() + { + return GetEnumerator(); + } + + /// <summary> + /// Create real cache enumerator. + /// </summary> + /// <param name="loc">Local flag.</param> + /// <param name="peekModes">Peek modes for local enumerator.</param> + /// <returns>Cache enumerator.</returns> + internal CacheEnumerator<TK, TV> CreateEnumerator(bool loc, int peekModes) + { + if (loc) + return new CacheEnumerator<TK, TV>(UU.CacheLocalIterator(Target, peekModes), Marshaller, _flagKeepPortable); + + return new CacheEnumerator<TK, TV>(UU.CacheIterator(Target), Marshaller, _flagKeepPortable); + } + + #endregion + + /** <inheritDoc /> */ + protected override T Unmarshal<T>(IPortableStream stream) + { + return Marshaller.Unmarshal<T>(stream, _flagKeepPortable); + } + + /// <summary> + /// Encodes the peek modes into a single int value. + /// </summary> + private static int EncodePeekModes(CachePeekMode[] modes) + { + int modesEncoded = 0; + + if (modes != null) + { + foreach (var mode in modes) + modesEncoded |= (int) mode; + } + + return modesEncoded; + } + + /// <summary> + /// Unwraps an exception from PortableResultHolder, if any. Otherwise does the cast. + /// </summary> + /// <typeparam name="T">Result type.</typeparam> + /// <param name="obj">Object.</param> + /// <returns>Result.</returns> + private static T GetResultOrThrow<T>(object obj) + { + var holder = obj as PortableResultWrapper; + + if (holder != null) + { + var err = holder.Result as Exception; + + if (err != null) + throw err as CacheEntryProcessorException ?? new CacheEntryProcessorException(err); + } + + return obj == null ? default(T) : (T) obj; + } + + /// <summary> + /// Reads results of InvokeAll operation. + /// </summary> + /// <typeparam name="T">The type of the result.</typeparam> + /// <param name="inStream">Stream.</param> + /// <returns>Results of InvokeAll operation.</returns> + private IDictionary<TK, ICacheEntryProcessorResult<T>> ReadInvokeAllResults<T>(IPortableStream inStream) + { + var count = inStream.ReadInt(); + + if (count == -1) + return null; + + var results = new Dictionary<TK, ICacheEntryProcessorResult<T>>(count); + + for (var i = 0; i < count; i++) + { + var key = Unmarshal<TK>(inStream); + + var hasError = inStream.ReadBool(); + + results[key] = hasError + ? new CacheEntryProcessorResult<T>(ReadException(inStream)) + : new CacheEntryProcessorResult<T>(Unmarshal<T>(inStream)); + } + + return results; + } + + /// <summary> + /// Reads the exception, either in portable wrapper form, or as a pair of strings. + /// </summary> + /// <param name="inStream">The stream.</param> + /// <returns>Exception.</returns> + private CacheEntryProcessorException ReadException(IPortableStream inStream) + { + var item = Unmarshal<object>(inStream); + + var clsName = item as string; + + if (clsName == null) + return new CacheEntryProcessorException((Exception) ((PortableResultWrapper) item).Result); + + var msg = Unmarshal<string>(inStream); + + return new CacheEntryProcessorException(ExceptionUtils.GetException(clsName, msg)); + } + + /// <summary> + /// Read dictionary returned by GET_ALL operation. + /// </summary> + /// <param name="reader">Reader.</param> + /// <returns>Dictionary.</returns> + private static IDictionary<TK, TV> ReadGetAllDictionary(PortableReaderImpl reader) + { + IPortableStream stream = reader.Stream; + + if (stream.ReadBool()) + { + int size = stream.ReadInt(); + + IDictionary<TK, TV> res = new Dictionary<TK, TV>(size); + + for (int i = 0; i < size; i++) + { + TK key = reader.ReadObject<TK>(); + TV val = reader.ReadObject<TV>(); + + res[key] = val; + } + + return res; + } + return null; + } + + /// <summary> + /// Gets the future result converter based on the last operation id. + /// </summary> + /// <typeparam name="TResult">The type of the future result.</typeparam> + /// <param name="lastAsyncOpId">The last op id.</param> + /// <returns>Future result converter.</returns> + private Func<PortableReaderImpl, TResult> GetFutureResultConverter<TResult>(int lastAsyncOpId) + { + if (lastAsyncOpId == (int) CacheOp.GetAll) + return reader => (TResult)ReadGetAllDictionary(reader); + + if (lastAsyncOpId == (int)CacheOp.Invoke) + return reader => { throw ReadException(reader.Stream); }; + + if (lastAsyncOpId == (int) CacheOp.InvokeAll) + return _invokeAllConverter.Value as Func<PortableReaderImpl, TResult>; + + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/CacheLock.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/CacheLock.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/CacheLock.cs new file mode 100644 index 0000000..ceb3b05 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/CacheLock.cs @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache +{ + using System; + using System.Diagnostics; + using System.Threading; + using Apache.Ignite.Core.Cache; + using Apache.Ignite.Core.Impl.Unmanaged; + using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; + + /// <summary> + /// Cache lock implementation. + /// </summary> + internal class CacheLock : ICacheLock + { + /** Unique lock ID.*/ + private readonly long _id; + + /** Cache. */ + private readonly IUnmanagedTarget _cache; + + /** State (-1 for disposed, >=0 for number of currently executing methods). */ + private int _state; + + /** Current number of lock contenders. */ + private int _counter; + + /// <summary> + /// Initializes a new instance of the <see cref="CacheLock"/> class. + /// </summary> + /// <param name="id">Lock id.</param> + /// <param name="cache">Cache.</param> + public CacheLock(long id, IUnmanagedTarget cache) + { + Debug.Assert(cache != null); + + _id = id; + _cache = cache; + } + + /** <inheritDoc /> */ + public void Enter() + { + lock (this) + { + ThrowIfDisposed(); + + _state++; + } + + var res = false; + + try + { + UU.CacheEnterLock(_cache, _id); + + res = true; + } + finally + { + lock (this) + { + if (res) + _counter++; + + _state--; + } + } + } + + /** <inheritDoc /> */ + public bool TryEnter() + { + return TryEnter(TimeSpan.FromMilliseconds(-1)); + } + + /** <inheritDoc /> */ + public bool TryEnter(TimeSpan timeout) + { + lock (this) + { + ThrowIfDisposed(); + + _state++; + } + + var res = false; + + try + { + return res = UU.CacheTryEnterLock(_cache, _id, (long)timeout.TotalMilliseconds); + } + finally + { + lock (this) + { + if (res) + _counter++; + + _state--; + } + } + } + + /** <inheritDoc /> */ + public void Exit() + { + lock (this) + { + ThrowIfDisposed(); + + UU.CacheExitLock(_cache, _id); + + _counter--; + } + } + + /** <inheritDoc /> */ + public void Dispose() + { + lock (this) + { + ThrowIfDisposed(); + + if (_state > 0 || _counter > 0) + throw new SynchronizationLockException( + "The lock is being disposed while still being used. " + + "It either is being held by a thread and/or has active waiters waiting to acquire the lock."); + + UU.CacheCloseLock(_cache, _id); + + _state = -1; + + GC.SuppressFinalize(this); + } + } + + /// <summary> + /// Finalizes an instance of the <see cref="CacheLock"/> class. + /// </summary> + ~CacheLock() + { + UU.CacheCloseLock(_cache, _id); + } + + /// <summary> + /// Throws <see cref="ObjectDisposedException"/> if this instance has been disposed. + /// </summary> + private void ThrowIfDisposed() + { + if (_state < 0) + throw new ObjectDisposedException("CacheLock", "CacheLock has been disposed."); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/CacheMetricsImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/CacheMetricsImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/CacheMetricsImpl.cs new file mode 100644 index 0000000..b5982f6 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/CacheMetricsImpl.cs @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache +{ + using Apache.Ignite.Core.Cache; + using Apache.Ignite.Core.Portable; + + /// <summary> + /// Cache metrics used to obtain statistics on cache. + /// </summary> + internal class CacheMetricsImpl : ICacheMetrics + { + /// <summary> + /// Initializes a new instance of the <see cref="CacheMetricsImpl"/> class. + /// </summary> + /// <param name="reader">The reader.</param> + public CacheMetricsImpl(IPortableRawReader reader) + { + CacheGets = reader.ReadLong(); + CachePuts = reader.ReadLong(); + CacheHits = reader.ReadLong(); + CacheMisses = reader.ReadLong(); + CacheTxCommits = reader.ReadLong(); + CacheTxRollbacks = reader.ReadLong(); + CacheEvictions = reader.ReadLong(); + CacheRemovals = reader.ReadLong(); + AveragePutTime = reader.ReadFloat(); + AverageGetTime = reader.ReadFloat(); + AverageRemoveTime = reader.ReadFloat(); + AverageTxCommitTime = reader.ReadFloat(); + AverageTxRollbackTime = reader.ReadFloat(); + CacheName = reader.ReadString(); + OverflowSize = reader.ReadLong(); + OffHeapEntriesCount = reader.ReadLong(); + OffHeapAllocatedSize = reader.ReadLong(); + Size = reader.ReadInt(); + KeySize = reader.ReadInt(); + IsEmpty = reader.ReadBoolean(); + DhtEvictQueueCurrentSize = reader.ReadInt(); + TxThreadMapSize = reader.ReadInt(); + TxXidMapSize = reader.ReadInt(); + TxCommitQueueSize = reader.ReadInt(); + TxPrepareQueueSize = reader.ReadInt(); + TxStartVersionCountsSize = reader.ReadInt(); + TxCommittedVersionsSize = reader.ReadInt(); + TxRolledbackVersionsSize = reader.ReadInt(); + TxDhtThreadMapSize = reader.ReadInt(); + TxDhtXidMapSize = reader.ReadInt(); + TxDhtCommitQueueSize = reader.ReadInt(); + TxDhtPrepareQueueSize = reader.ReadInt(); + TxDhtStartVersionCountsSize = reader.ReadInt(); + TxDhtCommittedVersionsSize = reader.ReadInt(); + TxDhtRolledbackVersionsSize = reader.ReadInt(); + IsWriteBehindEnabled = reader.ReadBoolean(); + WriteBehindFlushSize = reader.ReadInt(); + WriteBehindFlushThreadCount = reader.ReadInt(); + WriteBehindFlushFrequency = reader.ReadLong(); + WriteBehindStoreBatchSize = reader.ReadInt(); + WriteBehindTotalCriticalOverflowCount = reader.ReadInt(); + WriteBehindCriticalOverflowCount = reader.ReadInt(); + WriteBehindErrorRetryCount = reader.ReadInt(); + WriteBehindBufferSize = reader.ReadInt(); + KeyType = reader.ReadString(); + ValueType = reader.ReadString(); + IsStoreByValue = reader.ReadBoolean(); + IsStatisticsEnabled = reader.ReadBoolean(); + IsManagementEnabled = reader.ReadBoolean(); + IsReadThrough = reader.ReadBoolean(); + IsWriteThrough = reader.ReadBoolean(); + CacheHitPercentage = reader.ReadFloat(); + CacheMissPercentage = reader.ReadFloat(); + } + + /** <inheritdoc /> */ + public long CacheHits { get; private set; } + + /** <inheritdoc /> */ + public float CacheHitPercentage { get; private set; } + + /** <inheritdoc /> */ + public long CacheMisses { get; private set; } + + /** <inheritdoc /> */ + public float CacheMissPercentage { get; private set; } + + /** <inheritdoc /> */ + public long CacheGets { get; private set; } + + /** <inheritdoc /> */ + public long CachePuts { get; private set; } + + /** <inheritdoc /> */ + public long CacheRemovals { get; private set; } + + /** <inheritdoc /> */ + public long CacheEvictions { get; private set; } + + /** <inheritdoc /> */ + public float AverageGetTime { get; private set; } + + /** <inheritdoc /> */ + public float AveragePutTime { get; private set; } + + /** <inheritdoc /> */ + public float AverageRemoveTime { get; private set; } + + /** <inheritdoc /> */ + public float AverageTxCommitTime { get; private set; } + + /** <inheritdoc /> */ + public float AverageTxRollbackTime { get; private set; } + + /** <inheritdoc /> */ + public long CacheTxCommits { get; private set; } + + /** <inheritdoc /> */ + public long CacheTxRollbacks { get; private set; } + + /** <inheritdoc /> */ + public string CacheName { get; private set; } + + /** <inheritdoc /> */ + public long OverflowSize { get; private set; } + + /** <inheritdoc /> */ + public long OffHeapEntriesCount { get; private set; } + + /** <inheritdoc /> */ + public long OffHeapAllocatedSize { get; private set; } + + /** <inheritdoc /> */ + public int Size { get; private set; } + + /** <inheritdoc /> */ + public int KeySize { get; private set; } + + /** <inheritdoc /> */ + public bool IsEmpty { get; private set; } + + /** <inheritdoc /> */ + public int DhtEvictQueueCurrentSize { get; private set; } + + /** <inheritdoc /> */ + public int TxThreadMapSize { get; private set; } + + /** <inheritdoc /> */ + public int TxXidMapSize { get; private set; } + + /** <inheritdoc /> */ + public int TxCommitQueueSize { get; private set; } + + /** <inheritdoc /> */ + public int TxPrepareQueueSize { get; private set; } + + /** <inheritdoc /> */ + public int TxStartVersionCountsSize { get; private set; } + + /** <inheritdoc /> */ + public int TxCommittedVersionsSize { get; private set; } + + /** <inheritdoc /> */ + public int TxRolledbackVersionsSize { get; private set; } + + /** <inheritdoc /> */ + public int TxDhtThreadMapSize { get; private set; } + + /** <inheritdoc /> */ + public int TxDhtXidMapSize { get; private set; } + + /** <inheritdoc /> */ + public int TxDhtCommitQueueSize { get; private set; } + + /** <inheritdoc /> */ + public int TxDhtPrepareQueueSize { get; private set; } + + /** <inheritdoc /> */ + public int TxDhtStartVersionCountsSize { get; private set; } + + /** <inheritdoc /> */ + public int TxDhtCommittedVersionsSize { get; private set; } + + /** <inheritdoc /> */ + public int TxDhtRolledbackVersionsSize { get; private set; } + + /** <inheritdoc /> */ + public bool IsWriteBehindEnabled { get; private set; } + + /** <inheritdoc /> */ + public int WriteBehindFlushSize { get; private set; } + + /** <inheritdoc /> */ + public int WriteBehindFlushThreadCount { get; private set; } + + /** <inheritdoc /> */ + public long WriteBehindFlushFrequency { get; private set; } + + /** <inheritdoc /> */ + public int WriteBehindStoreBatchSize { get; private set; } + + /** <inheritdoc /> */ + public int WriteBehindTotalCriticalOverflowCount { get; private set; } + + /** <inheritdoc /> */ + public int WriteBehindCriticalOverflowCount { get; private set; } + + /** <inheritdoc /> */ + public int WriteBehindErrorRetryCount { get; private set; } + + /** <inheritdoc /> */ + public int WriteBehindBufferSize { get; private set; } + + /** <inheritdoc /> */ + public string KeyType { get; private set; } + + /** <inheritdoc /> */ + public string ValueType { get; private set; } + + /** <inheritdoc /> */ + public bool IsStoreByValue { get; private set; } + + /** <inheritdoc /> */ + public bool IsStatisticsEnabled { get; private set; } + + /** <inheritdoc /> */ + public bool IsManagementEnabled { get; private set; } + + /** <inheritdoc /> */ + public bool IsReadThrough { get; private set; } + + /** <inheritdoc /> */ + public bool IsWriteThrough { get; private set; } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs new file mode 100644 index 0000000..3eb63ca --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache +{ + /// <summary> + /// Cache opcodes. + /// </summary> + internal enum CacheOp + { + Clear = 1, + ClearAll = 2, + ContainsKey = 3, + ContainsKeys = 4, + Get = 5, + GetAll = 6, + GetAndPut = 7, + GetAndPutIfAbsent = 8, + GetAndRemove = 9, + GetAndReplace = 10, + GetName = 11, + Invoke = 12, + InvokeAll = 13, + IsLocalLocked = 14, + LoadCache = 15, + LocEvict = 16, + LocLoadCache = 17, + LocPromote = 18, + LocalClear = 20, + LocalClearAll = 21, + Lock = 22, + LockAll = 23, + Metrics = 24, + Peek = 25, + Put = 26, + PutAll = 27, + PutIfAbsent = 28, + QryContinuous = 29, + QryScan = 30, + QrySql = 31, + QrySqlFields = 32, + QryTxt = 33, + RemoveAll = 34, + RemoveBool = 35, + RemoveObj = 36, + Replace2 = 37, + Replace3 = 38 + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/CacheProxyImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/CacheProxyImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/CacheProxyImpl.cs new file mode 100644 index 0000000..5c6ee07 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/CacheProxyImpl.cs @@ -0,0 +1,499 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache +{ + using System.Collections; + using System.Collections.Generic; + using System.Diagnostics; + using System.Diagnostics.CodeAnalysis; + using System.Threading; + using Apache.Ignite.Core.Cache; + using Apache.Ignite.Core.Cache.Expiry; + using Apache.Ignite.Core.Cache.Query; + using Apache.Ignite.Core.Cache.Query.Continuous; + using Apache.Ignite.Core.Common; + + /// <summary> + /// Cache proxy. + /// </summary> + [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")] + internal class CacheProxyImpl<TK, TV> : ICache<TK, TV> + { + /** wrapped cache instance */ + private readonly CacheImpl<TK, TV> _cache; + + /** */ + private readonly ThreadLocal<int> _lastAsyncOp = new ThreadLocal<int>(() => PlatformTarget.OpNone); + + /// <summary> + /// Initializes a new instance of the <see cref="CacheProxyImpl{K, V}"/> class. + /// </summary> + /// <param name="cache">The cache to wrap.</param> + public CacheProxyImpl(CacheImpl<TK, TV> cache) + { + Debug.Assert(cache != null); + + _cache = cache; + } + + /** <inheritDoc /> */ + public ICache<TK, TV> WithSkipStore() + { + return _cache.IsSkipStore ? this : new CacheProxyImpl<TK, TV>((CacheImpl<TK, TV>)_cache.WithSkipStore()); + } + + /** <inheritDoc /> */ + public ICache<TK, TV> WithExpiryPolicy(IExpiryPolicy plc) + { + return new CacheProxyImpl<TK, TV>((CacheImpl<TK, TV>)_cache.WithExpiryPolicy(plc)); + } + + /** <inheritDoc /> */ + public ICache<TK, TV> WithAsync() + { + return IsAsync ? this : new CacheProxyImpl<TK, TV>((CacheImpl<TK, TV>) _cache.WithAsync()); + } + + /** <inheritDoc /> */ + public bool IsAsync + { + get { return _cache.IsAsync; } + } + + /** <inheritDoc /> */ + public IFuture GetFuture() + { + return GetFuture<object>(); + } + + /** <inheritDoc /> */ + public IFuture<TResult> GetFuture<TResult>() + { + var fut = _cache.GetFuture<TResult>(_lastAsyncOp.Value); + + ClearLastAsyncOp(); + + return fut; + } + + /** <inheritDoc /> */ + public IEnumerator<ICacheEntry<TK, TV>> GetEnumerator() + { + return _cache.GetEnumerator(); + } + + /** <inheritDoc /> */ + IEnumerator IEnumerable.GetEnumerator() + { + return ((IEnumerable) _cache).GetEnumerator(); + } + + /** <inheritDoc /> */ + public string Name + { + get { return _cache.Name; } + } + + /** <inheritDoc /> */ + public IIgnite Ignite + { + get { return _cache.Ignite; } + } + + /** <inheritDoc /> */ + public bool IsEmpty + { + get { return _cache.IsEmpty; } + } + + /** <inheritDoc /> */ + public bool KeepPortable + { + get { return _cache.KeepPortable; } + } + + /// <summary> + /// Skip store flag. + /// </summary> + internal bool SkipStore + { + get { return _cache.IsSkipStore; } + } + + /** <inheritDoc /> */ + public ICache<TK1, TV1> WithKeepPortable<TK1, TV1>() + { + return new CacheProxyImpl<TK1, TV1>((CacheImpl<TK1, TV1>) _cache.WithKeepPortable<TK1, TV1>()); + } + + /** <inheritDoc /> */ + public void LoadCache(ICacheEntryFilter<TK, TV> p, params object[] args) + { + _cache.LoadCache(p, args); + + SetLastAsyncOp(CacheOp.LoadCache); + } + + /** <inheritDoc /> */ + public void LocalLoadCache(ICacheEntryFilter<TK, TV> p, params object[] args) + { + _cache.LocalLoadCache(p, args); + + SetLastAsyncOp(CacheOp.LocLoadCache); + } + + /** <inheritDoc /> */ + public bool ContainsKey(TK key) + { + var result = _cache.ContainsKey(key); + + SetLastAsyncOp(CacheOp.ContainsKey); + + return result; + } + + /** <inheritDoc /> */ + public bool ContainsKeys(IEnumerable<TK> keys) + { + var result = _cache.ContainsKeys(keys); + + SetLastAsyncOp(CacheOp.ContainsKeys); + + return result; + } + + /** <inheritDoc /> */ + public TV LocalPeek(TK key, params CachePeekMode[] modes) + { + return _cache.LocalPeek(key, modes); + } + + /** <inheritDoc /> */ + public TV Get(TK key) + { + var result = _cache.Get(key); + + SetLastAsyncOp(CacheOp.Get); + + return result; + } + + /** <inheritDoc /> */ + public IDictionary<TK, TV> GetAll(IEnumerable<TK> keys) + { + var result = _cache.GetAll(keys); + + SetLastAsyncOp(CacheOp.GetAll); + + return result; + } + + /** <inheritDoc /> */ + public void Put(TK key, TV val) + { + _cache.Put(key, val); + + SetLastAsyncOp(CacheOp.Put); + } + + /** <inheritDoc /> */ + public TV GetAndPut(TK key, TV val) + { + var result = _cache.GetAndPut(key, val); + + SetLastAsyncOp(CacheOp.GetAndPut); + + return result; + } + + /** <inheritDoc /> */ + public TV GetAndReplace(TK key, TV val) + { + var result = _cache.GetAndReplace(key, val); + + SetLastAsyncOp(CacheOp.GetAndReplace); + + return result; + } + + /** <inheritDoc /> */ + public TV GetAndRemove(TK key) + { + var result = _cache.GetAndRemove(key); + + SetLastAsyncOp(CacheOp.GetAndRemove); + + return result; + } + + /** <inheritDoc /> */ + public bool PutIfAbsent(TK key, TV val) + { + var result = _cache.PutIfAbsent(key, val); + + SetLastAsyncOp(CacheOp.PutIfAbsent); + + return result; + } + + /** <inheritDoc /> */ + public TV GetAndPutIfAbsent(TK key, TV val) + { + var result = _cache.GetAndPutIfAbsent(key, val); + + SetLastAsyncOp(CacheOp.GetAndPutIfAbsent); + + return result; + } + + /** <inheritDoc /> */ + public bool Replace(TK key, TV val) + { + var result = _cache.Replace(key, val); + + SetLastAsyncOp(CacheOp.Replace2); + + return result; + } + + /** <inheritDoc /> */ + public bool Replace(TK key, TV oldVal, TV newVal) + { + var result = _cache.Replace(key, oldVal, newVal); + + SetLastAsyncOp(CacheOp.Replace3); + + return result; + } + + /** <inheritDoc /> */ + public void PutAll(IDictionary<TK, TV> vals) + { + _cache.PutAll(vals); + + SetLastAsyncOp(CacheOp.PutAll); + } + + /** <inheritDoc /> */ + public void LocalEvict(IEnumerable<TK> keys) + { + _cache.LocalEvict(keys); + } + + /** <inheritDoc /> */ + public void Clear() + { + _cache.Clear(); + + ClearLastAsyncOp(); + } + + /** <inheritDoc /> */ + public void Clear(TK key) + { + _cache.Clear(key); + + SetLastAsyncOp(CacheOp.Clear); + } + + /** <inheritDoc /> */ + public void ClearAll(IEnumerable<TK> keys) + { + _cache.ClearAll(keys); + + SetLastAsyncOp(CacheOp.ClearAll); + } + + /** <inheritDoc /> */ + public void LocalClear(TK key) + { + _cache.LocalClear(key); + } + + /** <inheritDoc /> */ + public void LocalClearAll(IEnumerable<TK> keys) + { + _cache.LocalClearAll(keys); + } + + /** <inheritDoc /> */ + public bool Remove(TK key) + { + var result = _cache.Remove(key); + + SetLastAsyncOp(CacheOp.RemoveObj); + + return result; + } + + /** <inheritDoc /> */ + public bool Remove(TK key, TV val) + { + var result = _cache.Remove(key, val); + + SetLastAsyncOp(CacheOp.RemoveBool); + + return result; + } + + /** <inheritDoc /> */ + public void RemoveAll(IEnumerable<TK> keys) + { + _cache.RemoveAll(keys); + + SetLastAsyncOp(CacheOp.RemoveAll); + } + + /** <inheritDoc /> */ + public void RemoveAll() + { + _cache.RemoveAll(); + + ClearLastAsyncOp(); + } + + /** <inheritDoc /> */ + public int LocalSize(params CachePeekMode[] modes) + { + return _cache.LocalSize(modes); + } + + /** <inheritDoc /> */ + public int Size(params CachePeekMode[] modes) + { + var result = _cache.Size(modes); + + ClearLastAsyncOp(); + + return result; + } + + /** <inheritDoc /> */ + public void LocalPromote(IEnumerable<TK> keys) + { + _cache.LocalPromote(keys); + } + + /** <inheritDoc /> */ + public IQueryCursor<ICacheEntry<TK, TV>> Query(QueryBase qry) + { + return _cache.Query(qry); + } + + /** <inheritDoc /> */ + public IQueryCursor<IList> QueryFields(SqlFieldsQuery qry) + { + return _cache.QueryFields(qry); + } + + /** <inheritDoc /> */ + public IContinuousQueryHandle QueryContinuous(ContinuousQuery<TK, TV> qry) + { + return _cache.QueryContinuous(qry); + } + + /** <inheritDoc /> */ + public IContinuousQueryHandle<ICacheEntry<TK, TV>> QueryContinuous(ContinuousQuery<TK, TV> qry, QueryBase initialQry) + { + return _cache.QueryContinuous(qry, initialQry); + } + + /** <inheritDoc /> */ + public IEnumerable<ICacheEntry<TK, TV>> GetLocalEntries(params CachePeekMode[] peekModes) + { + return _cache.GetLocalEntries(peekModes); + } + + /** <inheritDoc /> */ + public TR Invoke<TR, TA>(TK key, ICacheEntryProcessor<TK, TV, TA, TR> processor, TA arg) + { + var result = _cache.Invoke(key, processor, arg); + + SetLastAsyncOp(CacheOp.Invoke); + + return result; + } + + /** <inheritDoc /> */ + public IDictionary<TK, ICacheEntryProcessorResult<TR>> InvokeAll<TR, TA>(IEnumerable<TK> keys, + ICacheEntryProcessor<TK, TV, TA, TR> processor, TA arg) + { + var result = _cache.InvokeAll(keys, processor, arg); + + SetLastAsyncOp(CacheOp.InvokeAll); + + return result; + } + + /** <inheritDoc /> */ + public ICacheLock Lock(TK key) + { + return _cache.Lock(key); + } + + /** <inheritDoc /> */ + public ICacheLock LockAll(IEnumerable<TK> keys) + { + return _cache.LockAll(keys); + } + + /** <inheritDoc /> */ + public bool IsLocalLocked(TK key, bool byCurrentThread) + { + return _cache.IsLocalLocked(key, byCurrentThread); + } + + /** <inheritDoc /> */ + public ICacheMetrics GetMetrics() + { + return _cache.GetMetrics(); + } + + /** <inheritDoc /> */ + public IFuture Rebalance() + { + return _cache.Rebalance(); + } + + /** <inheritDoc /> */ + public ICache<TK, TV> WithNoRetries() + { + return _cache.IsNoRetries ? this : new CacheProxyImpl<TK, TV>((CacheImpl<TK, TV>) _cache.WithNoRetries()); + } + + /// <summary> + /// Sets the last asynchronous op id. + /// </summary> + /// <param name="opId">The op identifier.</param> + private void SetLastAsyncOp(CacheOp opId) + { + if (IsAsync) + _lastAsyncOp.Value = (int) opId; + } + + /// <summary> + /// Clears the last asynchronous op id. + /// This should be called in the end of each method that supports async and does not call SetLastAsyncOp. + /// </summary> + private void ClearLastAsyncOp() + { + if (IsAsync) + _lastAsyncOp.Value = PlatformTarget.OpNone; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Event/CacheEntryCreateEvent.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Event/CacheEntryCreateEvent.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Event/CacheEntryCreateEvent.cs new file mode 100644 index 0000000..8d9dfef --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Event/CacheEntryCreateEvent.cs @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache.Event +{ + using Apache.Ignite.Core.Cache.Event; + + /// <summary> + /// Cache entry create event. + /// </summary> + internal class CacheEntryCreateEvent<TK, TV> : ICacheEntryEvent<TK, TV> + { + /** Key.*/ + private readonly TK _key; + + /** Value.*/ + private readonly TV _val; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="key">Key.</param> + /// <param name="val">Value.</param> + public CacheEntryCreateEvent(TK key, TV val) + { + _key = key; + _val = val; + } + + /** <inheritdoc /> */ + public TK Key + { + get { return _key; } + } + + /** <inheritdoc /> */ + public TV Value + { + get { return _val; } + } + + /** <inheritdoc /> */ + public TV OldValue + { + get { return default(TV); } + } + + /** <inheritdoc /> */ + public bool HasOldValue + { + get { return false; } + } + + /** <inheritdoc /> */ + public CacheEntryEventType EventType + { + get { return CacheEntryEventType.Created; } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Event/CacheEntryRemoveEvent.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Event/CacheEntryRemoveEvent.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Event/CacheEntryRemoveEvent.cs new file mode 100644 index 0000000..a44a800 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Event/CacheEntryRemoveEvent.cs @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache.Event +{ + using Apache.Ignite.Core.Cache.Event; + + /// <summary> + /// Cache entry remove event. + /// </summary> + internal class CacheEntryRemoveEvent<TK, TV> : ICacheEntryEvent<TK, TV> + { + /** Key.*/ + private readonly TK _key; + + /** Old value.*/ + private readonly TV _oldVal; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="key">Key.</param> + /// <param name="oldVal">Old value.</param> + public CacheEntryRemoveEvent(TK key, TV oldVal) + { + _key = key; + _oldVal = oldVal; + } + + /** <inheritdoc /> */ + public TK Key + { + get { return _key; } + } + + /** <inheritdoc /> */ + public TV Value + { + get { return default(TV); } + } + + /** <inheritdoc /> */ + public TV OldValue + { + get { return _oldVal; } + } + + /** <inheritdoc /> */ + public bool HasOldValue + { + get { return true; } + } + + /** <inheritdoc /> */ + public CacheEntryEventType EventType + { + get { return CacheEntryEventType.Removed; } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Event/CacheEntryUpdateEvent.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Event/CacheEntryUpdateEvent.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Event/CacheEntryUpdateEvent.cs new file mode 100644 index 0000000..e6fb927 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Event/CacheEntryUpdateEvent.cs @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache.Event +{ + using Apache.Ignite.Core.Cache.Event; + + /// <summary> + /// Cache entry update event. + /// </summary> + internal class CacheEntryUpdateEvent<TK, TV> : ICacheEntryEvent<TK, TV> + { + /** Key.*/ + private readonly TK _key; + + /** Value.*/ + private readonly TV _val; + + /** Old value.*/ + private readonly TV _oldVal; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="key">Key.</param> + /// <param name="oldVal">Old value.</param> + /// <param name="val">Value.</param> + public CacheEntryUpdateEvent(TK key, TV oldVal, TV val) + { + _key = key; + _oldVal = oldVal; + _val = val; + } + + /** <inheritdoc /> */ + public TK Key + { + get { return _key; } + } + + /** <inheritdoc /> */ + public TV Value + { + get { return _val; } + } + + /** <inheritdoc /> */ + public TV OldValue + { + get { return _oldVal; } + } + + /** <inheritdoc /> */ + public bool HasOldValue + { + get { return true; } + } + + /** <inheritdoc /> */ + public CacheEntryEventType EventType + { + get { return CacheEntryEventType.Updated; } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/MutableCacheEntry.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/MutableCacheEntry.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/MutableCacheEntry.cs new file mode 100644 index 0000000..2c69043 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/MutableCacheEntry.cs @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache +{ + using System; + using Apache.Ignite.Core.Cache; + using Apache.Ignite.Core.Impl.Common; + + /// <summary> + /// Represents a cache entry. + /// </summary> + internal class MutableCacheEntry<TK, TV> : IMutableCacheEntry<TK, TV>, IMutableCacheEntryInternal + { + // Entry value + private TV _value; + + /// <summary> + /// Initializes a new instance of the <see cref="MutableCacheEntry{K, V}"/> class. + /// </summary> + /// <param name="key">The key.</param> + public MutableCacheEntry(TK key) + { + Key = key; + } + + /// <summary> + /// Initializes a new instance of the <see cref="MutableCacheEntry{K, V}"/> class. + /// </summary> + /// <param name="key">The key.</param> + /// <param name="value">The value.</param> + public MutableCacheEntry(TK key, TV value) + { + Key = key; + _value = value; + Exists = true; + } + + /** <inheritdoc /> */ + public TK Key { get; private set; } + + /** <inheritdoc /> */ + object IMutableCacheEntryInternal.Key + { + get { return Key; } + } + + /** <inheritdoc /> */ + public TV Value + { + get { return _value; } + set + { + _value = value; + Exists = true; + State = MutableCacheEntryState.ValueSet; + } + } + + /** <inheritdoc /> */ + object IMutableCacheEntryInternal.Value + { + get { return Value; } + } + + /** <inheritdoc /> */ + public bool Exists { get; private set; } + + /** <inheritdoc /> */ + public void Remove() + { + Value = default(TV); + Exists = false; + State = MutableCacheEntryState.Removed; + } + + /** <inheritdoc /> */ + public MutableCacheEntryState State { get; private set; } + } + + /// <summary> + /// Internal non-generic representation of a mutable cache entry. + /// </summary> + internal interface IMutableCacheEntryInternal + { + /// <summary> + /// Gets the key. + /// </summary> + object Key { get; } + + /// <summary> + /// Gets the value. + /// </summary> + object Value { get; } + + /// <summary> + /// Gets a value indicating whether cache entry exists. + /// </summary> + bool Exists { get; } + + /// <summary> + /// Gets the state indicating user operation on this instance. + /// </summary> + MutableCacheEntryState State { get; } + } + + /// <summary> + /// Mutable cache entry factory. + /// </summary> + internal static class MutableCacheEntry + { + private static readonly CopyOnWriteConcurrentDictionary<Tuple<Type, Type>, Func<object, object, bool, IMutableCacheEntryInternal>> + Ctors = new CopyOnWriteConcurrentDictionary<Tuple<Type, Type>, Func<object, object, bool, IMutableCacheEntryInternal>>(); + + public static Func<object, object, bool, IMutableCacheEntryInternal> GetCtor(Type keyType, Type valType) + { + Func<object, object, bool, IMutableCacheEntryInternal> result; + var funcKey = new Tuple<Type, Type>(keyType, valType); + + return Ctors.TryGetValue(funcKey, out result) + ? result + : Ctors.GetOrAdd(funcKey, x => + { + var entryType = typeof (MutableCacheEntry<,>).MakeGenericType(keyType, valType); + + var oneArg = DelegateConverter.CompileCtor<Func<object, IMutableCacheEntryInternal>>(entryType, + new[] {keyType}, false); + + var twoArg = + DelegateConverter.CompileCtor<Func<object, object, IMutableCacheEntryInternal>>(entryType, + new[] {keyType, valType}, false); + + return (k, v, exists) => exists ? twoArg(k, v) : oneArg(k); + }); + } + } + + /// <summary> + /// Represents result of user operation on a mutable cache entry. + /// </summary> + internal enum MutableCacheEntryState : byte + { + Intact = 0, + ValueSet = 1, + Removed = 2, + ErrPortable = 3, + ErrString = 4 + } +} \ No newline at end of file
