http://git-wip-us.apache.org/repos/asf/ignite/blob/cc1aa533/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs index af230b3..4ceb292 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs @@ -22,12 +22,11 @@ namespace Apache.Ignite.Core.Impl.Cache using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; - using System.Threading; + using System.Threading.Tasks; using Apache.Ignite.Core.Cache; using Apache.Ignite.Core.Cache.Expiry; using Apache.Ignite.Core.Cache.Query; using Apache.Ignite.Core.Cache.Query.Continuous; - using Apache.Ignite.Core.Common; using Apache.Ignite.Core.Impl.Cache.Query; using Apache.Ignite.Core.Impl.Cache.Query.Continuous; using Apache.Ignite.Core.Impl.Common; @@ -67,13 +66,8 @@ namespace Apache.Ignite.Core.Impl.Cache /** Flag: no-retries.*/ private readonly bool _flagNoRetries; - /** - * Result converter for async InvokeAll operation. - * In future result processing there is only one TResult generic argument, - * and we can't get the type of ICacheEntryProcessorResult at compile time from it. - * This field caches converter for the last InvokeAll operation to avoid using reflection. - */ - private readonly ThreadLocal<object> _invokeAllConverter = new ThreadLocal<object>(); + /** Async instance. */ + private readonly Lazy<CacheImpl<TK, TV>> _asyncInstance; /// <summary> /// Constructor. @@ -93,54 +87,62 @@ namespace Apache.Ignite.Core.Impl.Cache _flagKeepPortable = flagKeepPortable; _flagAsync = flagAsync; _flagNoRetries = flagNoRetries; + + _asyncInstance = new Lazy<CacheImpl<TK, TV>>(() => new CacheImpl<TK, TV>(this)); } - /** <inheritDoc /> */ - public IIgnite Ignite + /// <summary> + /// Initializes a new async instance. + /// </summary> + /// <param name="cache">The cache.</param> + private CacheImpl(CacheImpl<TK, TV> cache) : base(UU.CacheWithAsync(cache.Target), cache.Marshaller) { - get - { - return _ignite; - } + _ignite = cache._ignite; + _flagSkipStore = cache._flagSkipStore; + _flagKeepPortable = cache._flagKeepPortable; + _flagAsync = true; + _flagNoRetries = cache._flagNoRetries; } /** <inheritDoc /> */ - public bool IsAsync + public IIgnite Ignite { - get { return _flagAsync; } + get { return _ignite; } } /** <inheritDoc /> */ - public IFuture GetFuture() + private bool IsAsync { - throw new NotSupportedException("GetFuture() should be called through CacheProxyImpl"); + get { return _flagAsync; } } - /** <inheritDoc /> */ - public IFuture<TResult> GetFuture<TResult>() + /// <summary> + /// Gets and resets task for previous asynchronous operation. + /// </summary> + /// <param name="lastAsyncOp">The last async op id.</param> + /// <returns> + /// Task for previous asynchronous operation. + /// </returns> + private Task GetTask(CacheOp lastAsyncOp) { - throw new NotSupportedException("GetFuture() should be called through CacheProxyImpl"); + return GetTask<object>(lastAsyncOp); } /// <summary> - /// Gets and resets future for previous asynchronous operation. + /// Gets and resets task for previous asynchronous operation. /// </summary> + /// <typeparam name="TResult">The type of the result.</typeparam> /// <param name="lastAsyncOp">The last async op id.</param> + /// <param name="converter">The converter.</param> /// <returns> - /// Future for previous asynchronous operation. + /// Task for previous asynchronous operation. /// </returns> - /// <exception cref="System.InvalidOperationException">Asynchronous mode is disabled</exception> - internal IFuture<TResult> GetFuture<TResult>(CacheOp lastAsyncOp) + private Task<TResult> GetTask<TResult>(CacheOp lastAsyncOp, Func<PortableReaderImpl, TResult> converter = null) { - if (!_flagAsync) - throw IgniteUtils.GetAsyncModeDisabledException(); - - var converter = GetFutureResultConverter<TResult>(lastAsyncOp); - - _invokeAllConverter.Value = null; + Debug.Assert(_flagAsync); return GetFuture((futId, futTypeId) => UU.TargetListenFutureForOperation(Target, futId, futTypeId, - (int) lastAsyncOp), _flagKeepPortable, converter); + (int) lastAsyncOp), _flagKeepPortable, converter).Task; } /** <inheritDoc /> */ @@ -151,6 +153,7 @@ namespace Apache.Ignite.Core.Impl.Cache /** <inheritDoc /> */ + /** <inheritDoc /> */ public bool IsEmpty() { return GetSize() == 0; @@ -225,13 +228,6 @@ namespace Apache.Ignite.Core.Impl.Cache } /** <inheritDoc /> */ - public ICache<TK, TV> WithAsync() - { - return _flagAsync ? this : new CacheImpl<TK, TV>(_ignite, UU.CacheWithAsync(Target), Marshaller, - _flagSkipStore, _flagKeepPortable, true, _flagNoRetries); - } - - /** <inheritDoc /> */ public bool IsKeepPortable { get { return _flagKeepPortable; } @@ -244,11 +240,27 @@ namespace Apache.Ignite.Core.Impl.Cache } /** <inheritDoc /> */ + public Task LoadCacheAsync(ICacheEntryFilter<TK, TV> p, params object[] args) + { + AsyncInstance.LoadCache(p, args); + + return AsyncInstance.GetTask(CacheOp.LoadCache); + } + + /** <inheritDoc /> */ public void LocalLoadCache(ICacheEntryFilter<TK, TV> p, params object[] args) { LoadCache0(p, args, (int)CacheOp.LocLoadCache); } + /** <inheritDoc /> */ + public Task LocalLoadCacheAsync(ICacheEntryFilter<TK, TV> p, params object[] args) + { + AsyncInstance.LocalLoadCache(p, args); + + return AsyncInstance.GetTask(CacheOp.LocLoadCache); + } + /// <summary> /// Loads the cache. /// </summary> @@ -276,7 +288,15 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(key, "key"); return DoOutOp((int)CacheOp.ContainsKey, key) == True; - } + } + + /** <inheritDoc /> */ + public Task<bool> ContainsKeyAsync(TK key) + { + AsyncInstance.ContainsKey(key); + + return AsyncInstance.GetTask<bool>(CacheOp.ContainsKey); + } /** <inheritDoc /> */ public bool ContainsKeys(IEnumerable<TK> keys) @@ -284,7 +304,15 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(keys, "keys"); return DoOutOp((int)CacheOp.ContainsKeys, writer => WriteEnumerable(writer, keys)) == True; - } + } + + /** <inheritDoc /> */ + public Task<bool> ContainsKeysAsync(IEnumerable<TK> keys) + { + AsyncInstance.ContainsKeys(keys); + + return AsyncInstance.GetTask<bool>(CacheOp.ContainsKeys); + } /** <inheritDoc /> */ public TV LocalPeek(TK key, params CachePeekMode[] modes) @@ -355,6 +383,20 @@ namespace Apache.Ignite.Core.Impl.Cache } /** <inheritDoc /> */ + public Task<TV> GetAsync(TK key) + { + AsyncInstance.Get(key); + + return AsyncInstance.GetTask(CacheOp.Get, reader => + { + if (reader != null) + return reader.ReadObject<TV>(); + + throw GetKeyNotFoundException(); + }); + } + + /** <inheritDoc /> */ public bool TryGet(TK key, out TV value) { IgniteArgumentCheck.NotNull(key, "key"); @@ -370,6 +412,16 @@ namespace Apache.Ignite.Core.Impl.Cache } /** <inheritDoc /> */ + public Task<CacheResult<TV>> TryGetAsync(TK key) + { + IgniteArgumentCheck.NotNull(key, "key"); + + AsyncInstance.Get(key); + + return AsyncInstance.GetTask(CacheOp.Get, GetCacheResult); + } + + /** <inheritDoc /> */ public IDictionary<TK, TV> GetAll(IEnumerable<TK> keys) { IgniteArgumentCheck.NotNull(keys, "keys"); @@ -384,6 +436,14 @@ namespace Apache.Ignite.Core.Impl.Cache }); } + /** <inheritDoc /> */ + public Task<IDictionary<TK, TV>> GetAllAsync(IEnumerable<TK> keys) + { + AsyncInstance.GetAll(keys); + + return AsyncInstance.GetTask(CacheOp.GetAll, r => r == null ? null : ReadGetAllDictionary(r)); + } + /** <inheritdoc /> */ public void Put(TK key, TV val) { @@ -395,6 +455,14 @@ namespace Apache.Ignite.Core.Impl.Cache } /** <inheritDoc /> */ + public Task PutAsync(TK key, TV val) + { + AsyncInstance.Put(key, val); + + return AsyncInstance.GetTask(CacheOp.Put); + } + + /** <inheritDoc /> */ public CacheResult<TV> GetAndPut(TK key, TV val) { IgniteArgumentCheck.NotNull(key, "key"); @@ -405,13 +473,29 @@ namespace Apache.Ignite.Core.Impl.Cache } /** <inheritDoc /> */ + public Task<CacheResult<TV>> GetAndPutAsync(TK key, TV val) + { + AsyncInstance.GetAndPut(key, val); + + return AsyncInstance.GetTask(CacheOp.GetAndPut, GetCacheResult); + } + + /** <inheritDoc /> */ public CacheResult<TV> GetAndReplace(TK key, TV val) { IgniteArgumentCheck.NotNull(key, "key"); IgniteArgumentCheck.NotNull(val, "val"); - return DoOutInOpNullable<TK, TV, TV>((int)CacheOp.GetAndReplace, key, val); + return DoOutInOpNullable<TK, TV, TV>((int) CacheOp.GetAndReplace, key, val); + } + + /** <inheritDoc /> */ + public Task<CacheResult<TV>> GetAndReplaceAsync(TK key, TV val) + { + AsyncInstance.GetAndReplace(key, val); + + return AsyncInstance.GetTask(CacheOp.GetAndReplace, GetCacheResult); } /** <inheritDoc /> */ @@ -422,6 +506,14 @@ namespace Apache.Ignite.Core.Impl.Cache return DoOutInOpNullable<TK, TV>((int)CacheOp.GetAndRemove, key); } + /** <inheritDoc /> */ + public Task<CacheResult<TV>> GetAndRemoveAsync(TK key) + { + AsyncInstance.GetAndRemove(key); + + return AsyncInstance.GetTask(CacheOp.GetAndRemove, GetCacheResult); + } + /** <inheritdoc /> */ public bool PutIfAbsent(TK key, TV val) { @@ -432,6 +524,14 @@ namespace Apache.Ignite.Core.Impl.Cache return DoOutOp((int) CacheOp.PutIfAbsent, key, val) == True; } + /** <inheritDoc /> */ + public Task<bool> PutIfAbsentAsync(TK key, TV val) + { + AsyncInstance.PutIfAbsent(key, val); + + return AsyncInstance.GetTask<bool>(CacheOp.PutIfAbsent); + } + /** <inheritdoc /> */ public CacheResult<TV> GetAndPutIfAbsent(TK key, TV val) { @@ -442,6 +542,14 @@ namespace Apache.Ignite.Core.Impl.Cache return DoOutInOpNullable<TK, TV, TV>((int)CacheOp.GetAndPutIfAbsent, key, val); } + /** <inheritDoc /> */ + public Task<CacheResult<TV>> GetAndPutIfAbsentAsync(TK key, TV val) + { + AsyncInstance.GetAndPutIfAbsent(key, val); + + return AsyncInstance.GetTask(CacheOp.GetAndPutIfAbsent, GetCacheResult); + } + /** <inheritdoc /> */ public bool Replace(TK key, TV val) { @@ -449,7 +557,15 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(val, "val"); - return DoOutOp((int)CacheOp.Replace2, key, val) == True; + return DoOutOp((int) CacheOp.Replace2, key, val) == True; + } + + /** <inheritDoc /> */ + public Task<bool> ReplaceAsync(TK key, TV val) + { + AsyncInstance.Replace(key, val); + + return AsyncInstance.GetTask<bool>(CacheOp.Replace2); } /** <inheritdoc /> */ @@ -464,6 +580,14 @@ namespace Apache.Ignite.Core.Impl.Cache return DoOutOp((int)CacheOp.Replace3, key, oldVal, newVal) == True; } + /** <inheritDoc /> */ + public Task<bool> ReplaceAsync(TK key, TV oldVal, TV newVal) + { + AsyncInstance.Replace(key, oldVal, newVal); + + return AsyncInstance.GetTask<bool>(CacheOp.Replace3); + } + /** <inheritdoc /> */ public void PutAll(IDictionary<TK, TV> vals) { @@ -471,7 +595,15 @@ namespace Apache.Ignite.Core.Impl.Cache DoOutOp((int) CacheOp.PutAll, writer => WriteDictionary(writer, vals)); } - + + /** <inheritDoc /> */ + public Task PutAllAsync(IDictionary<TK, TV> vals) + { + AsyncInstance.PutAll(vals); + + return AsyncInstance.GetTask(CacheOp.PutAll); + } + /** <inheritdoc /> */ public void LocalEvict(IEnumerable<TK> keys) { @@ -486,12 +618,28 @@ namespace Apache.Ignite.Core.Impl.Cache UU.CacheClear(Target); } + /** <inheritDoc /> */ + public Task ClearAsync() + { + AsyncInstance.Clear(); + + return AsyncInstance.GetTask(); + } + /** <inheritdoc /> */ public void Clear(TK key) { IgniteArgumentCheck.NotNull(key, "key"); - DoOutOp((int)CacheOp.Clear, key); + DoOutOp((int) CacheOp.Clear, key); + } + + /** <inheritDoc /> */ + public Task ClearAsync(TK key) + { + AsyncInstance.Clear(key); + + return AsyncInstance.GetTask(CacheOp.Clear); } /** <inheritdoc /> */ @@ -502,6 +650,14 @@ namespace Apache.Ignite.Core.Impl.Cache DoOutOp((int)CacheOp.ClearAll, writer => WriteEnumerable(writer, keys)); } + /** <inheritDoc /> */ + public Task ClearAllAsync(IEnumerable<TK> keys) + { + AsyncInstance.ClearAll(keys); + + return AsyncInstance.GetTask(CacheOp.ClearAll); + } + /** <inheritdoc /> */ public void LocalClear(TK key) { @@ -523,7 +679,15 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(key, "key"); - return DoOutOp((int)CacheOp.RemoveObj, key) == True; + return DoOutOp((int) CacheOp.RemoveObj, key) == True; + } + + /** <inheritDoc /> */ + public Task<bool> RemoveAsync(TK key) + { + AsyncInstance.Remove(key); + + return AsyncInstance.GetTask<bool>(CacheOp.RemoveObj); } /** <inheritDoc /> */ @@ -537,6 +701,14 @@ namespace Apache.Ignite.Core.Impl.Cache } /** <inheritDoc /> */ + public Task<bool> RemoveAsync(TK key, TV val) + { + AsyncInstance.Remove(key, val); + + return AsyncInstance.GetTask<bool>(CacheOp.RemoveBool); + } + + /** <inheritDoc /> */ public void RemoveAll(IEnumerable<TK> keys) { IgniteArgumentCheck.NotNull(keys, "keys"); @@ -545,12 +717,28 @@ namespace Apache.Ignite.Core.Impl.Cache } /** <inheritDoc /> */ + public Task RemoveAllAsync(IEnumerable<TK> keys) + { + AsyncInstance.RemoveAll(keys); + + return AsyncInstance.GetTask(CacheOp.RemoveAll); + } + + /** <inheritDoc /> */ public void RemoveAll() { UU.CacheRemoveAll(Target); } /** <inheritDoc /> */ + public Task RemoveAllAsync() + { + AsyncInstance.RemoveAll(); + + return AsyncInstance.GetTask(); + } + + /** <inheritDoc /> */ public int GetLocalSize(params CachePeekMode[] modes) { return Size0(true, modes); @@ -562,6 +750,14 @@ namespace Apache.Ignite.Core.Impl.Cache return Size0(false, modes); } + /** <inheritDoc /> */ + public Task<int> GetSizeAsync(params CachePeekMode[] modes) + { + AsyncInstance.GetSize(modes); + + return AsyncInstance.GetTask<int>(); + } + /// <summary> /// Internal size routine. /// </summary> @@ -601,6 +797,25 @@ namespace Apache.Ignite.Core.Impl.Cache input => GetResultOrThrow<TRes>(Unmarshal<object>(input))); } + /** <inheritDoc /> */ + public Task<TRes> InvokeAsync<TArg, TRes>(TK key, ICacheEntryProcessor<TK, TV, TArg, TRes> processor, TArg arg) + { + AsyncInstance.Invoke(key, processor, arg); + + return AsyncInstance.GetTask(CacheOp.Invoke, r => + { + if (r == null) + return default(TRes); + + var hasError = r.ReadBoolean(); + + if (hasError) + throw ReadException(r.Stream); + + return r.ReadObject<TRes>(); + }); + } + /** <inheritdoc /> */ public IDictionary<TK, ICacheEntryProcessorResult<TRes>> InvokeAll<TArg, TRes>(IEnumerable<TK> keys, ICacheEntryProcessor<TK, TV, TArg, TRes> processor, TArg arg) @@ -612,19 +827,21 @@ namespace Apache.Ignite.Core.Impl.Cache var holder = new CacheEntryProcessorHolder(processor, arg, (e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TArg)a), typeof(TK), typeof(TV)); - return DoOutInOp((int)CacheOp.InvokeAll, writer => - { - WriteEnumerable(writer, keys); - writer.Write(holder); - }, - input => - { - if (IsAsync) - _invokeAllConverter.Value = (Func<PortableReaderImpl, IDictionary<TK, ICacheEntryProcessorResult<TRes>>>) - (reader => ReadInvokeAllResults<TRes>(reader.Stream)); + return DoOutInOp((int) CacheOp.InvokeAll, + writer => + { + WriteEnumerable(writer, keys); + writer.Write(holder); + }, + input => ReadInvokeAllResults<TRes>(input)); + } - return ReadInvokeAllResults<TRes>(input); - }); + /** <inheritDoc /> */ + public Task<IDictionary<TK, ICacheEntryProcessorResult<TRes>>> InvokeAllAsync<TArg, TRes>(IEnumerable<TK> keys, ICacheEntryProcessor<TK, TV, TArg, TRes> processor, TArg arg) + { + AsyncInstance.InvokeAll(keys, processor, arg); + + return AsyncInstance.GetTask(CacheOp.InvokeAll, reader => ReadInvokeAllResults<TRes>(reader.Stream)); } /** <inheritdoc /> */ @@ -673,9 +890,9 @@ namespace Apache.Ignite.Core.Impl.Cache } /** <inheritDoc /> */ - public IFuture Rebalance() + public Task Rebalance() { - return GetFuture<object>((futId, futTyp) => UU.CacheRebalance(Target, futId)); + return GetFuture<object>((futId, futTyp) => UU.CacheRebalance(Target, futId)).Task; } /** <inheritDoc /> */ @@ -689,11 +906,11 @@ namespace Apache.Ignite.Core.Impl.Cache } /// <summary> - /// Gets a value indicating whether this instance is in no-retries mode. + /// Gets the asynchronous instance. /// </summary> - internal bool IsNoRetries + private CacheImpl<TK, TV> AsyncInstance { - get { return _flagNoRetries; } + get { return _asyncInstance.Value; } } #region Queries @@ -970,59 +1187,15 @@ namespace Apache.Ignite.Core.Impl.Cache } /// <summary> - /// Gets the future result converter based on the last operation id. + /// Gets the cache result. /// </summary> - /// <typeparam name="TResult">The type of the future result.</typeparam> - /// <param name="lastAsyncOpId">The last op id.</param> - /// <returns>Future result converter.</returns> - private Func<PortableReaderImpl, TResult> GetFutureResultConverter<TResult>(CacheOp lastAsyncOpId) + private static CacheResult<TV> GetCacheResult(PortableReaderImpl reader) { - switch (lastAsyncOpId) - { - case CacheOp.Get: - return reader => - { - if (reader != null) - return reader.ReadObject<TResult>(); - - throw GetKeyNotFoundException(); - }; - - case CacheOp.GetAll: - return reader => reader == null ? default(TResult) : (TResult) ReadGetAllDictionary(reader); + var res = reader == null + ? new CacheResult<TV>() + : new CacheResult<TV>(reader.ReadObject<TV>()); - case CacheOp.Invoke: - return reader => - { - if (reader == null) - return default(TResult); - - var hasError = reader.ReadBoolean(); - - if (hasError) - throw ReadException(reader.Stream); - - return reader.ReadObject<TResult>(); - }; - - case CacheOp.InvokeAll: - return _invokeAllConverter.Value as Func<PortableReaderImpl, TResult>; - - case CacheOp.GetAndPut: - case CacheOp.GetAndPutIfAbsent: - case CacheOp.GetAndRemove: - case CacheOp.GetAndReplace: - return reader => - { - var res = reader == null - ? new CacheResult<TV>() - : new CacheResult<TV>(reader.ReadObject<TV>()); - - return TypeCaster<TResult>.Cast(res); - }; - } - - return null; + return res; } /// <summary>
http://git-wip-us.apache.org/repos/asf/ignite/blob/cc1aa533/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheProxyImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheProxyImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheProxyImpl.cs deleted file mode 100644 index aaaf8c3..0000000 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheProxyImpl.cs +++ /dev/null @@ -1,519 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Cache -{ - using System.Collections; - using System.Collections.Generic; - using System.Diagnostics; - using System.Diagnostics.CodeAnalysis; - using System.Threading; - using Apache.Ignite.Core.Cache; - using Apache.Ignite.Core.Cache.Expiry; - using Apache.Ignite.Core.Cache.Query; - using Apache.Ignite.Core.Cache.Query.Continuous; - using Apache.Ignite.Core.Common; - - /// <summary> - /// Cache proxy. - /// </summary> - [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")] - internal class CacheProxyImpl<TK, TV> : ICache<TK, TV> - { - /** wrapped cache instance */ - private readonly CacheImpl<TK, TV> _cache; - - /** */ - private readonly ThreadLocal<CacheOp> _lastAsyncOp = new ThreadLocal<CacheOp>(() => CacheOp.None); - - /// <summary> - /// Initializes a new instance of the <see cref="CacheProxyImpl{K, V}"/> class. - /// </summary> - /// <param name="cache">The cache to wrap.</param> - public CacheProxyImpl(CacheImpl<TK, TV> cache) - { - Debug.Assert(cache != null); - - _cache = cache; - } - - /** <inheritDoc /> */ - public ICache<TK, TV> WithSkipStore() - { - return _cache.IsSkipStore ? this : new CacheProxyImpl<TK, TV>((CacheImpl<TK, TV>)_cache.WithSkipStore()); - } - - /** <inheritDoc /> */ - public ICache<TK, TV> WithExpiryPolicy(IExpiryPolicy plc) - { - return new CacheProxyImpl<TK, TV>((CacheImpl<TK, TV>)_cache.WithExpiryPolicy(plc)); - } - - /** <inheritDoc /> */ - public ICache<TK, TV> WithAsync() - { - return IsAsync ? this : new CacheProxyImpl<TK, TV>((CacheImpl<TK, TV>) _cache.WithAsync()); - } - - /** <inheritDoc /> */ - public bool IsAsync - { - get { return _cache.IsAsync; } - } - - /** <inheritDoc /> */ - public IFuture GetFuture() - { - return GetFuture<object>(); - } - - /** <inheritDoc /> */ - public IFuture<TResult> GetFuture<TResult>() - { - var fut = _cache.GetFuture<TResult>(_lastAsyncOp.Value); - - ClearLastAsyncOp(); - - return fut; - } - - /** <inheritDoc /> */ - public IEnumerator<ICacheEntry<TK, TV>> GetEnumerator() - { - return _cache.GetEnumerator(); - } - - /** <inheritDoc /> */ - IEnumerator IEnumerable.GetEnumerator() - { - return ((IEnumerable) _cache).GetEnumerator(); - } - - /** <inheritDoc /> */ - public string Name - { - get { return _cache.Name; } - } - - /** <inheritDoc /> */ - public IIgnite Ignite - { - get { return _cache.Ignite; } - } - - /** <inheritDoc /> */ - - public bool IsEmpty() - { - return _cache.IsEmpty(); - } - - /** <inheritDoc /> */ - public bool IsKeepPortable - { - get { return _cache.IsKeepPortable; } - } - - /// <summary> - /// Skip store flag. - /// </summary> - internal bool SkipStore - { - get { return _cache.IsSkipStore; } - } - - /** <inheritDoc /> */ - public ICache<TK1, TV1> WithKeepPortable<TK1, TV1>() - { - return new CacheProxyImpl<TK1, TV1>((CacheImpl<TK1, TV1>) _cache.WithKeepPortable<TK1, TV1>()); - } - - /** <inheritDoc /> */ - public void LoadCache(ICacheEntryFilter<TK, TV> p, params object[] args) - { - _cache.LoadCache(p, args); - - SetLastAsyncOp(CacheOp.LoadCache); - } - - /** <inheritDoc /> */ - public void LocalLoadCache(ICacheEntryFilter<TK, TV> p, params object[] args) - { - _cache.LocalLoadCache(p, args); - - SetLastAsyncOp(CacheOp.LocLoadCache); - } - - /** <inheritDoc /> */ - public bool ContainsKey(TK key) - { - var result = _cache.ContainsKey(key); - - SetLastAsyncOp(CacheOp.ContainsKey); - - return result; - } - - /** <inheritDoc /> */ - public bool ContainsKeys(IEnumerable<TK> keys) - { - var result = _cache.ContainsKeys(keys); - - SetLastAsyncOp(CacheOp.ContainsKeys); - - return result; - } - - /** <inheritDoc /> */ - public TV LocalPeek(TK key, params CachePeekMode[] modes) - { - return _cache.LocalPeek(key, modes); - } - - /** <inheritDoc /> */ - public bool TryLocalPeek(TK key, out TV value, params CachePeekMode[] modes) - { - return _cache.TryLocalPeek(key, out value, modes); - } - - /** <inheritDoc /> */ - public TV this[TK key] - { - get { return _cache[key]; } - set { _cache[key] = value; } - } - - /** <inheritDoc /> */ - public TV Get(TK key) - { - var result = _cache.Get(key); - - SetLastAsyncOp(CacheOp.Get); - - return result; - } - - /** <inheritDoc /> */ - public bool TryGet(TK key, out TV value) - { - return _cache.TryGet(key, out value); - } - - /** <inheritDoc /> */ - public IDictionary<TK, TV> GetAll(IEnumerable<TK> keys) - { - var result = _cache.GetAll(keys); - - SetLastAsyncOp(CacheOp.GetAll); - - return result; - } - - /** <inheritDoc /> */ - public void Put(TK key, TV val) - { - _cache.Put(key, val); - - SetLastAsyncOp(CacheOp.Put); - } - - /** <inheritDoc /> */ - public CacheResult<TV> GetAndPut(TK key, TV val) - { - var result = _cache.GetAndPut(key, val); - - SetLastAsyncOp(CacheOp.GetAndPut); - - return result; - } - - /** <inheritDoc /> */ - public CacheResult<TV> GetAndReplace(TK key, TV val) - { - var result = _cache.GetAndReplace(key, val); - - SetLastAsyncOp(CacheOp.GetAndReplace); - - return result; - } - - /** <inheritDoc /> */ - public CacheResult<TV> GetAndRemove(TK key) - { - var result = _cache.GetAndRemove(key); - - SetLastAsyncOp(CacheOp.GetAndRemove); - - return result; - } - - /** <inheritDoc /> */ - public bool PutIfAbsent(TK key, TV val) - { - var result = _cache.PutIfAbsent(key, val); - - SetLastAsyncOp(CacheOp.PutIfAbsent); - - return result; - } - - /** <inheritDoc /> */ - public CacheResult<TV> GetAndPutIfAbsent(TK key, TV val) - { - var result = _cache.GetAndPutIfAbsent(key, val); - - SetLastAsyncOp(CacheOp.GetAndPutIfAbsent); - - return result; - } - - /** <inheritDoc /> */ - public bool Replace(TK key, TV val) - { - var result = _cache.Replace(key, val); - - SetLastAsyncOp(CacheOp.Replace2); - - return result; - } - - /** <inheritDoc /> */ - public bool Replace(TK key, TV oldVal, TV newVal) - { - var result = _cache.Replace(key, oldVal, newVal); - - SetLastAsyncOp(CacheOp.Replace3); - - return result; - } - - /** <inheritDoc /> */ - public void PutAll(IDictionary<TK, TV> vals) - { - _cache.PutAll(vals); - - SetLastAsyncOp(CacheOp.PutAll); - } - - /** <inheritDoc /> */ - public void LocalEvict(IEnumerable<TK> keys) - { - _cache.LocalEvict(keys); - } - - /** <inheritDoc /> */ - public void Clear() - { - _cache.Clear(); - - ClearLastAsyncOp(); - } - - /** <inheritDoc /> */ - public void Clear(TK key) - { - _cache.Clear(key); - - SetLastAsyncOp(CacheOp.Clear); - } - - /** <inheritDoc /> */ - public void ClearAll(IEnumerable<TK> keys) - { - _cache.ClearAll(keys); - - SetLastAsyncOp(CacheOp.ClearAll); - } - - /** <inheritDoc /> */ - public void LocalClear(TK key) - { - _cache.LocalClear(key); - } - - /** <inheritDoc /> */ - public void LocalClearAll(IEnumerable<TK> keys) - { - _cache.LocalClearAll(keys); - } - - /** <inheritDoc /> */ - public bool Remove(TK key) - { - var result = _cache.Remove(key); - - SetLastAsyncOp(CacheOp.RemoveObj); - - return result; - } - - /** <inheritDoc /> */ - public bool Remove(TK key, TV val) - { - var result = _cache.Remove(key, val); - - SetLastAsyncOp(CacheOp.RemoveBool); - - return result; - } - - /** <inheritDoc /> */ - public void RemoveAll(IEnumerable<TK> keys) - { - _cache.RemoveAll(keys); - - SetLastAsyncOp(CacheOp.RemoveAll); - } - - /** <inheritDoc /> */ - public void RemoveAll() - { - _cache.RemoveAll(); - - ClearLastAsyncOp(); - } - - /** <inheritDoc /> */ - public int GetLocalSize(params CachePeekMode[] modes) - { - return _cache.GetLocalSize(modes); - } - - /** <inheritDoc /> */ - public int GetSize(params CachePeekMode[] modes) - { - var result = _cache.GetSize(modes); - - ClearLastAsyncOp(); - - return result; - } - - /** <inheritDoc /> */ - public void LocalPromote(IEnumerable<TK> keys) - { - _cache.LocalPromote(keys); - } - - /** <inheritDoc /> */ - public IQueryCursor<ICacheEntry<TK, TV>> Query(QueryBase qry) - { - return _cache.Query(qry); - } - - /** <inheritDoc /> */ - public IQueryCursor<IList> QueryFields(SqlFieldsQuery qry) - { - return _cache.QueryFields(qry); - } - - /** <inheritDoc /> */ - public IContinuousQueryHandle QueryContinuous(ContinuousQuery<TK, TV> qry) - { - return _cache.QueryContinuous(qry); - } - - /** <inheritDoc /> */ - public IContinuousQueryHandle<ICacheEntry<TK, TV>> QueryContinuous(ContinuousQuery<TK, TV> qry, QueryBase initialQry) - { - return _cache.QueryContinuous(qry, initialQry); - } - - /** <inheritDoc /> */ - public IEnumerable<ICacheEntry<TK, TV>> GetLocalEntries(params CachePeekMode[] peekModes) - { - return _cache.GetLocalEntries(peekModes); - } - - /** <inheritDoc /> */ - public TRes Invoke<TArg, TRes>(TK key, ICacheEntryProcessor<TK, TV, TArg, TRes> processor, TArg arg) - { - var result = _cache.Invoke(key, processor, arg); - - SetLastAsyncOp(CacheOp.Invoke); - - return result; - } - - /** <inheritDoc /> */ - public IDictionary<TK, ICacheEntryProcessorResult<TRes>> InvokeAll<TArg, TRes>(IEnumerable<TK> keys, - ICacheEntryProcessor<TK, TV, TArg, TRes> processor, TArg arg) - { - var result = _cache.InvokeAll(keys, processor, arg); - - SetLastAsyncOp(CacheOp.InvokeAll); - - return result; - } - - /** <inheritDoc /> */ - public ICacheLock Lock(TK key) - { - return _cache.Lock(key); - } - - /** <inheritDoc /> */ - public ICacheLock LockAll(IEnumerable<TK> keys) - { - return _cache.LockAll(keys); - } - - /** <inheritDoc /> */ - public bool IsLocalLocked(TK key, bool byCurrentThread) - { - return _cache.IsLocalLocked(key, byCurrentThread); - } - - /** <inheritDoc /> */ - public ICacheMetrics GetMetrics() - { - return _cache.GetMetrics(); - } - - /** <inheritDoc /> */ - public IFuture Rebalance() - { - return _cache.Rebalance(); - } - - /** <inheritDoc /> */ - public ICache<TK, TV> WithNoRetries() - { - return _cache.IsNoRetries ? this : new CacheProxyImpl<TK, TV>((CacheImpl<TK, TV>) _cache.WithNoRetries()); - } - - /// <summary> - /// Sets the last asynchronous op id. - /// </summary> - /// <param name="opId">The op identifier.</param> - private void SetLastAsyncOp(CacheOp opId) - { - if (IsAsync) - _lastAsyncOp.Value = opId; - } - - /// <summary> - /// Clears the last asynchronous op id. - /// This should be called in the end of each method that supports async and does not call SetLastAsyncOp. - /// </summary> - private void ClearLastAsyncOp() - { - if (IsAsync) - _lastAsyncOp.Value = CacheOp.None; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/cc1aa533/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/AsyncResult.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/AsyncResult.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/AsyncResult.cs deleted file mode 100644 index 4e5c396..0000000 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/AsyncResult.cs +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Common -{ - using System; - using System.Diagnostics.CodeAnalysis; - using System.Threading; - using Apache.Ignite.Core.Common; - - /// <summary> - /// Adapts IGridFuture to the IAsyncResult. - /// </summary> - [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable", - Justification = "Implementing IDisposable has no point since we return this class as IAsyncResult " + - "to the client, and IAsyncResult is not IDisposable.")] - public class AsyncResult : IAsyncResult - { - /** */ - private readonly ManualResetEvent _waitHandle; - - /// <summary> - /// Initializes a new instance of the <see cref="AsyncResult"/> class. - /// </summary> - /// <param name="fut">The future to wrap.</param> - public AsyncResult(IFuture fut) - { - _waitHandle = new ManualResetEvent(false); - - fut.Listen(() => _waitHandle.Set()); - } - - /** <inheritdoc /> */ - public bool IsCompleted - { - get { return _waitHandle.WaitOne(0); } - } - - /** <inheritdoc /> */ - public WaitHandle AsyncWaitHandle - { - get { return _waitHandle; } - } - - /** <inheritdoc /> */ - public object AsyncState - { - get { return null; } - } - - /** <inheritdoc /> */ - public bool CompletedSynchronously - { - get { return false; } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/cc1aa533/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/CompletedAsyncResult.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/CompletedAsyncResult.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/CompletedAsyncResult.cs deleted file mode 100644 index febe969..0000000 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/CompletedAsyncResult.cs +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Common -{ - using System; - using System.Diagnostics.CodeAnalysis; - using System.Threading; - - /// <summary> - /// Represents an IAsyncResult that is completed. - /// </summary> - [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable", - Justification = "Implementing IDisposable has no point since we return this class as IAsyncResult " + - "to the client, and IAsyncResult is not IDisposable.")] - public class CompletedAsyncResult : IAsyncResult - { - /** */ - private readonly WaitHandle _asyncWaitHandle = new ManualResetEvent(true); - - /** <inheritdoc /> */ - public bool IsCompleted - { - get { return true; } - } - - /** <inheritdoc /> */ - public WaitHandle AsyncWaitHandle - { - get { return _asyncWaitHandle; } - } - - /** <inheritdoc /> */ - public object AsyncState - { - get { return null; } - } - - /** <inheritdoc /> */ - public bool CompletedSynchronously - { - get { return false; } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/cc1aa533/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs index 70bebc4..4bf8a32 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs @@ -18,11 +18,8 @@ namespace Apache.Ignite.Core.Impl.Common { using System; - using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; - using System.Threading; using System.Threading.Tasks; - using Apache.Ignite.Core.Common; using Apache.Ignite.Core.Impl.Portable.IO; /// <summary> @@ -30,22 +27,13 @@ namespace Apache.Ignite.Core.Impl.Common /// </summary> [SuppressMessage("ReSharper", "ParameterHidesMember")] [CLSCompliant(false)] - public sealed class Future<T> : IFutureInternal, IFuture<T> + public sealed class Future<T> : IFutureInternal { /** Converter. */ private readonly IFutureConverter<T> _converter; - /** Result. */ - private T _res; - - /** Caught cxception. */ - private Exception _err; - - /** Done flag. */ - private volatile bool _done; - - /** Listener(s). Either Action or List{Action}. */ - private object _callbacks; + /** Task completion source. */ + private readonly TaskCompletionSource<T> _taskCompletionSource = new TaskCompletionSource<T>(); /// <summary> /// Constructor. @@ -57,139 +45,22 @@ namespace Apache.Ignite.Core.Impl.Common } /** <inheritdoc/> */ - public bool IsDone - { - get { return _done; } - } - - /** <inheritdoc/> */ public T Get() { - if (!_done) + try { - lock (this) - { - while (!_done) - Monitor.Wait(this); - } + return Task.Result; } - - return Get0(); - } - - /** <inheritdoc/> */ - public T Get(TimeSpan timeout) - { - long ticks = timeout.Ticks; - - if (ticks < 0) - throw new ArgumentException("Timeout cannot be negative."); - - if (ticks == 0) - return Get(); - - if (!_done) + catch (AggregateException ex) { - // Fallback to locked mode. - lock (this) - { - long endTime = DateTime.Now.Ticks + ticks; - - if (!_done) - { - while (true) - { - Monitor.Wait(this, timeout); - - if (_done) - break; - - ticks = endTime - DateTime.Now.Ticks; - - if (ticks <= 0) - throw new TimeoutException("Timeout waiting for future completion."); - - timeout = TimeSpan.FromTicks(ticks); - } - } - } + throw ex.InnerException; } - - return Get0(); - } - - /** <inheritdoc/> */ - public void Listen(Action callback) - { - Listen((Action<IFuture<T>>) (fut => callback())); } /** <inheritdoc/> */ - public void Listen(Action<IFuture> callback) + public Task<T> Task { - Listen((Action<IFuture<T>>)callback); - } - - /** <inheritdoc/> */ - public void Listen(Action<IFuture<T>> callback) - { - IgniteArgumentCheck.NotNull(callback, "callback"); - - if (!_done) - { - lock (this) - { - if (!_done) - { - AddCallback(callback); - - return; - } - } - } - - callback(this); - } - - /// <summary> - /// Get result or throw an error. - /// </summary> - private T Get0() - { - if (_err != null) - throw _err; - - return _res; - } - - /** <inheritdoc/> */ - public IAsyncResult ToAsyncResult() - { - return _done ? (IAsyncResult) new CompletedAsyncResult() : new AsyncResult(this); - } - - /** <inheritdoc/> */ - Task<object> IFuture.ToTask() - { - return Task.Factory.FromAsync(ToAsyncResult(), x => (object) Get()); - } - - /** <inheritdoc/> */ - public Task<T> ToTask() - { - return Task.Factory.FromAsync(ToAsyncResult(), x => Get()); - } - - /** <inheritdoc/> */ - object IFuture.Get(TimeSpan timeout) - { - return Get(timeout); - } - - /** <inheritdoc/> */ - object IFuture.Get() - { - return Get(); + get { return _taskCompletionSource.Task; } } /** <inheritdoc /> */ @@ -209,7 +80,7 @@ namespace Apache.Ignite.Core.Impl.Common /** <inheritdoc /> */ public void OnError(Exception err) { - OnDone(default(T), err); + _taskCompletionSource.TrySetException(err); } /** <inheritdoc /> */ @@ -238,7 +109,7 @@ namespace Apache.Ignite.Core.Impl.Common /// <param name="res">Result.</param> internal void OnResult(T res) { - OnDone(res, null); + _taskCompletionSource.TrySetResult(res); } /// <summary> @@ -248,54 +119,10 @@ namespace Apache.Ignite.Core.Impl.Common /// <param name="err">Error.</param> public void OnDone(T res, Exception err) { - object callbacks0 = null; - - lock (this) - { - if (!_done) - { - _res = res; - _err = err; - - _done = true; - - Monitor.PulseAll(this); - - // Notify listeners outside the lock - callbacks0 = _callbacks; - _callbacks = null; - } - } - - if (callbacks0 != null) - { - var list = callbacks0 as List<Action<IFuture<T>>>; - - if (list != null) - list.ForEach(x => x(this)); - else - ((Action<IFuture<T>>) callbacks0)(this); - } - } - - /// <summary> - /// Adds a callback. - /// </summary> - private void AddCallback(Action<IFuture<T>> callback) - { - if (_callbacks == null) - { - _callbacks = callback; - - return; - } - - var list = _callbacks as List<Action<IFuture<T>>> ?? - new List<Action<IFuture<T>>> {(Action<IFuture<T>>) _callbacks}; - - list.Add(callback); - - _callbacks = list; + if (err != null) + OnError(err); + else + OnResult(res); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/cc1aa533/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs index d7fc59f..d0e920a 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs @@ -20,8 +20,8 @@ namespace Apache.Ignite.Core.Impl.Compute using System; using System.Collections.Generic; using System.Diagnostics; + using System.Threading.Tasks; using Apache.Ignite.Core.Cluster; - using Apache.Ignite.Core.Common; using Apache.Ignite.Core.Compute; /// <summary> @@ -44,30 +44,6 @@ namespace Apache.Ignite.Core.Impl.Compute } /** <inheritDoc /> */ - public ICompute WithAsync() - { - return new ComputeAsync(_compute); - } - - /** <inheritDoc /> */ - public bool IsAsync - { - get { return false; } - } - - /** <inheritDoc /> */ - public IFuture GetFuture() - { - throw IgniteUtils.GetAsyncModeDisabledException(); - } - - /** <inheritDoc /> */ - public IFuture<TResult> GetFuture<TResult>() - { - throw IgniteUtils.GetAsyncModeDisabledException(); - } - - /** <inheritDoc /> */ public IClusterGroup ClusterGroup { get { return _compute.ClusterGroup; } @@ -104,53 +80,108 @@ namespace Apache.Ignite.Core.Impl.Compute } /** <inheritDoc /> */ + public Task<TRes> ExecuteJavaTaskAsync<TRes>(string taskName, object taskArg) + { + return _compute.ExecuteJavaTaskAsync<TRes>(taskName, taskArg).Task; + } + + /** <inheritDoc /> */ public TReduceRes Execute<TArg, TJobRes, TReduceRes>(IComputeTask<TArg, TJobRes, TReduceRes> task, TArg taskArg) { return _compute.Execute(task, taskArg).Get(); } /** <inheritDoc /> */ + public Task<TRes> ExecuteAsync<TArg, TJobRes, TRes>(IComputeTask<TArg, TJobRes, TRes> task, TArg taskArg) + { + return _compute.Execute(task, taskArg).Task; + } + + /** <inheritDoc /> */ public TJobRes Execute<TArg, TJobRes>(IComputeTask<TArg, TJobRes> task) { return _compute.Execute(task, null).Get(); } /** <inheritDoc /> */ + public Task<TRes> ExecuteAsync<TJobRes, TRes>(IComputeTask<TJobRes, TRes> task) + { + return _compute.Execute(task, null).Task; + } + + /** <inheritDoc /> */ public TReduceRes Execute<TArg, TJobRes, TReduceRes>(Type taskType, TArg taskArg) { return _compute.Execute<TArg, TJobRes, TReduceRes>(taskType, taskArg).Get(); } + /** <inheritDoc /> */ + public Task<TReduceRes> ExecuteAsync<TArg, TJobRes, TReduceRes>(Type taskType, TArg taskArg) + { + return _compute.Execute<TArg, TJobRes, TReduceRes>(taskType, taskArg).Task; + } + + /** <inheritDoc /> */ public TReduceRes Execute<TArg, TReduceRes>(Type taskType) { return _compute.Execute<object, TArg, TReduceRes>(taskType, null).Get(); } /** <inheritDoc /> */ + public Task<TReduceRes> ExecuteAsync<TArg, TReduceRes>(Type taskType) + { + return _compute.Execute<object, TArg, TReduceRes>(taskType, null).Task; + } + + /** <inheritDoc /> */ public TJobRes Call<TJobRes>(IComputeFunc<TJobRes> clo) { return _compute.Execute(clo).Get(); } /** <inheritDoc /> */ + public Task<TRes> CallAsync<TRes>(IComputeFunc<TRes> clo) + { + return _compute.Execute(clo).Task; + } + + /** <inheritDoc /> */ public TJobRes AffinityCall<TJobRes>(string cacheName, object affinityKey, IComputeFunc<TJobRes> clo) { return _compute.AffinityCall(cacheName, affinityKey, clo).Get(); } /** <inheritDoc /> */ + public Task<TRes> AffinityCallAsync<TRes>(string cacheName, object affinityKey, IComputeFunc<TRes> clo) + { + return _compute.AffinityCall(cacheName, affinityKey, clo).Task; + } + + /** <inheritDoc /> */ public TJobRes Call<TJobRes>(Func<TJobRes> func) { return _compute.Execute(func).Get(); } /** <inheritDoc /> */ + public Task<TRes> CallAsync<TFuncRes, TRes>(IEnumerable<IComputeFunc<TFuncRes>> clos, IComputeReducer<TFuncRes, TRes> reducer) + { + return _compute.Execute(clos, reducer).Task; + } + + /** <inheritDoc /> */ public ICollection<TJobRes> Call<TJobRes>(IEnumerable<IComputeFunc<TJobRes>> clos) { return _compute.Execute(clos).Get(); } /** <inheritDoc /> */ + public Task<ICollection<TRes>> CallAsync<TRes>(IEnumerable<IComputeFunc<TRes>> clos) + { + return _compute.Execute(clos).Task; + } + + /** <inheritDoc /> */ public TReduceRes Call<TJobRes, TReduceRes>(IEnumerable<IComputeFunc<TJobRes>> clos, IComputeReducer<TJobRes, TReduceRes> reducer) { @@ -164,52 +195,106 @@ namespace Apache.Ignite.Core.Impl.Compute } /** <inheritDoc /> */ + public Task<ICollection<TRes>> BroadcastAsync<TRes>(IComputeFunc<TRes> clo) + { + return _compute.Broadcast(clo).Task; + } + + /** <inheritDoc /> */ public ICollection<TJobRes> Broadcast<T, TJobRes>(IComputeFunc<T, TJobRes> clo, T arg) { return _compute.Broadcast(clo, arg).Get(); } /** <inheritDoc /> */ + public Task<ICollection<TRes>> BroadcastAsync<TArg, TRes>(IComputeFunc<TArg, TRes> clo, TArg arg) + { + return _compute.Broadcast(clo, arg).Task; + } + + /** <inheritDoc /> */ public void Broadcast(IComputeAction action) { _compute.Broadcast(action).Get(); } /** <inheritDoc /> */ + public Task BroadcastAsync(IComputeAction action) + { + return _compute.Broadcast(action).Task; + } + + /** <inheritDoc /> */ public void Run(IComputeAction action) { _compute.Run(action).Get(); } /** <inheritDoc /> */ + public Task RunAsync(IComputeAction action) + { + return _compute.Run(action).Task; + } + + /** <inheritDoc /> */ public void AffinityRun(string cacheName, object affinityKey, IComputeAction action) { _compute.AffinityRun(cacheName, affinityKey, action).Get(); } /** <inheritDoc /> */ + public Task AffinityRunAsync(string cacheName, object affinityKey, IComputeAction action) + { + return _compute.AffinityRun(cacheName, affinityKey, action).Task; + } + + /** <inheritDoc /> */ public void Run(IEnumerable<IComputeAction> actions) { _compute.Run(actions).Get(); } /** <inheritDoc /> */ + public Task RunAsync(IEnumerable<IComputeAction> actions) + { + return _compute.Run(actions).Task; + } + + /** <inheritDoc /> */ public TJobRes Apply<TArg, TJobRes>(IComputeFunc<TArg, TJobRes> clo, TArg arg) { return _compute.Apply(clo, arg).Get(); } /** <inheritDoc /> */ + public Task<TRes> ApplyAsync<TArg, TRes>(IComputeFunc<TArg, TRes> clo, TArg arg) + { + return _compute.Apply(clo, arg).Task; + } + + /** <inheritDoc /> */ public ICollection<TJobRes> Apply<TArg, TJobRes>(IComputeFunc<TArg, TJobRes> clo, IEnumerable<TArg> args) { return _compute.Apply(clo, args).Get(); } /** <inheritDoc /> */ + public Task<ICollection<TRes>> ApplyAsync<TArg, TRes>(IComputeFunc<TArg, TRes> clo, IEnumerable<TArg> args) + { + return _compute.Apply(clo, args).Task; + } + + /** <inheritDoc /> */ public TReduceRes Apply<TArg, TJobRes, TReduceRes>(IComputeFunc<TArg, TJobRes> clo, IEnumerable<TArg> args, IComputeReducer<TJobRes, TReduceRes> rdc) { return _compute.Apply(clo, args, rdc).Get(); } + + /** <inheritDoc /> */ + public Task<TRes> ApplyAsync<TArg, TFuncRes, TRes>(IComputeFunc<TArg, TFuncRes> clo, IEnumerable<TArg> args, IComputeReducer<TFuncRes, TRes> rdc) + { + return _compute.Apply(clo, args, rdc).Task; + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/cc1aa533/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs deleted file mode 100644 index 89c5b83..0000000 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Compute -{ - using System; - using System.Collections.Generic; - using System.Diagnostics.CodeAnalysis; - using System.Globalization; - using System.Threading; - using Apache.Ignite.Core.Cluster; - using Apache.Ignite.Core.Common; - using Apache.Ignite.Core.Compute; - - /// <summary> - /// Asynchronous Compute facade. - /// </summary> - [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")] - internal class ComputeAsync : ICompute - { - /** */ - protected readonly ComputeImpl Compute; - - /** Current future. */ - private readonly ThreadLocal<IFuture> _curFut = new ThreadLocal<IFuture>(); - - /// <summary> - /// Initializes a new instance of the <see cref="ComputeAsync"/> class. - /// </summary> - /// <param name="computeImpl">The compute implementation.</param> - internal ComputeAsync(ComputeImpl computeImpl) - { - Compute = computeImpl; - } - - /** <inheritDoc /> */ - public ICompute WithAsync() - { - return this; - } - - /** <inheritDoc /> */ - public bool IsAsync - { - get { return true; } - } - - /** <inheritDoc /> */ - public IFuture GetFuture() - { - return GetFuture<object>(); - } - - /** <inheritDoc /> */ - public IFuture<TResult> GetFuture<TResult>() - { - var fut = _curFut.Value; - - if (fut == null) - throw new InvalidOperationException("Asynchronous operation not started."); - - var fut0 = fut as IFuture<TResult>; - - if (fut0 == null) - throw new InvalidOperationException( - string.Format(CultureInfo.InvariantCulture, - "Requested future type {0} is incompatible with current future type {1}", - typeof (IFuture<TResult>), fut.GetType())); - - _curFut.Value = null; - - return fut0; - } - - /** <inheritDoc /> */ - public IClusterGroup ClusterGroup - { - get { return Compute.ClusterGroup; } - } - - /** <inheritDoc /> */ - public ICompute WithNoFailover() - { - Compute.WithNoFailover(); - - return this; - } - - /** <inheritDoc /> */ - public ICompute WithTimeout(long timeout) - { - Compute.WithTimeout(timeout); - - return this; - } - - /** <inheritDoc /> */ - public ICompute WithKeepPortable() - { - Compute.WithKeepPortable(); - - return this; - } - - /** <inheritDoc /> */ - public TReduceRes ExecuteJavaTask<TReduceRes>(string taskName, object taskArg) - { - _curFut.Value = Compute.ExecuteJavaTaskAsync<TReduceRes>(taskName, taskArg); - - return default(TReduceRes); - } - - /** <inheritDoc /> */ - public TReduceRes Execute<TArg, TJobRes, TReduceRes>(IComputeTask<TArg, TJobRes, TReduceRes> task, TArg taskArg) - { - _curFut.Value = Compute.Execute(task, taskArg); - - return default(TReduceRes); - } - - /** <inheritDoc /> */ - public TReduceRes Execute<TJobRes, TReduceRes>(IComputeTask<TJobRes, TReduceRes> task) - { - _curFut.Value = Compute.Execute(task, null); - - return default(TReduceRes); - } - - /** <inheritDoc /> */ - public TReduceRes Execute<TArg, TJobRes, TReduceRes>(Type taskType, TArg taskArg) - { - _curFut.Value = Compute.Execute<TArg, TJobRes, TReduceRes>(taskType, taskArg); - - return default(TReduceRes); - } - - /** <inheritDoc /> */ - public TReduceRes Execute<TJobRes, TReduceRes>(Type taskType) - { - _curFut.Value = Compute.Execute<object, TJobRes, TReduceRes>(taskType, null); - - return default(TReduceRes); - } - - /** <inheritDoc /> */ - public TJobRes Call<TJobRes>(IComputeFunc<TJobRes> clo) - { - _curFut.Value = Compute.Execute(clo); - - return default(TJobRes); - } - - /** <inheritDoc /> */ - public TJobRes AffinityCall<TJobRes>(string cacheName, object affinityKey, IComputeFunc<TJobRes> clo) - { - Compute.AffinityCall(cacheName, affinityKey, clo); - - return default(TJobRes); - } - - /** <inheritDoc /> */ - public TJobRes Call<TJobRes>(Func<TJobRes> func) - { - _curFut.Value = Compute.Execute(func); - - return default(TJobRes); - } - - /** <inheritDoc /> */ - public ICollection<TJobRes> Call<TJobRes>(IEnumerable<IComputeFunc<TJobRes>> clos) - { - _curFut.Value = Compute.Execute(clos); - - return null; - } - - /** <inheritDoc /> */ - public TReduceRes Call<TJobRes, TReduceRes>(IEnumerable<IComputeFunc<TJobRes>> clos, IComputeReducer<TJobRes, TReduceRes> reducer) - { - _curFut.Value = Compute.Execute(clos, reducer); - - return default(TReduceRes); - } - - /** <inheritDoc /> */ - public ICollection<TJobRes> Broadcast<TJobRes>(IComputeFunc<TJobRes> clo) - { - _curFut.Value = Compute.Broadcast(clo); - - return null; - } - - /** <inheritDoc /> */ - public ICollection<TJobRes> Broadcast<TArg, TJobRes>(IComputeFunc<TArg, TJobRes> clo, TArg arg) - { - _curFut.Value = Compute.Broadcast(clo, arg); - - return null; - } - - /** <inheritDoc /> */ - public void Broadcast(IComputeAction action) - { - _curFut.Value = Compute.Broadcast(action); - } - - /** <inheritDoc /> */ - public void Run(IComputeAction action) - { - _curFut.Value = Compute.Run(action); - } - - /** <inheritDoc /> */ - public void AffinityRun(string cacheName, object affinityKey, IComputeAction action) - { - Compute.AffinityRun(cacheName, affinityKey, action); - } - - /** <inheritDoc /> */ - public void Run(IEnumerable<IComputeAction> actions) - { - _curFut.Value = Compute.Run(actions); - } - - /** <inheritDoc /> */ - public TJobRes Apply<TArg, TJobRes>(IComputeFunc<TArg, TJobRes> clo, TArg arg) - { - _curFut.Value = Compute.Apply(clo, arg); - - return default(TJobRes); - } - - /** <inheritDoc /> */ - public ICollection<TJobRes> Apply<TArg, TJobRes>(IComputeFunc<TArg, TJobRes> clo, IEnumerable<TArg> args) - { - _curFut.Value = Compute.Apply(clo, args); - - return null; - } - - /** <inheritDoc /> */ - public TReduceRes Apply<TArg, TJobRes, TReduceRes>(IComputeFunc<TArg, TJobRes> clo, - IEnumerable<TArg> args, IComputeReducer<TJobRes, TReduceRes> rdc) - { - _curFut.Value = Compute.Apply(clo, args, rdc); - - return default(TReduceRes); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/cc1aa533/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs index abd54da..7adc49f 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs @@ -150,7 +150,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// If task for given name has not been deployed yet, /// then 'taskName' will be used as task class name to auto-deploy the task. /// </summary> - public IFuture<TReduceRes> ExecuteJavaTaskAsync<TReduceRes>(string taskName, object taskArg) + public Future<TReduceRes> ExecuteJavaTaskAsync<TReduceRes>(string taskName, object taskArg) { IgniteArgumentCheck.NotNullOrEmpty(taskName, "taskName"); @@ -158,7 +158,7 @@ namespace Apache.Ignite.Core.Impl.Compute try { - IFuture<TReduceRes> fut = null; + Future<TReduceRes> fut = null; DoOutInOp(OpExecAsync, writer => { @@ -183,7 +183,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// <param name="task">Task to execute.</param> /// <param name="taskArg">Optional task argument.</param> /// <returns>Task result.</returns> - public IFuture<TReduceRes> Execute<TArg, TJobRes, TReduceRes>(IComputeTask<TArg, TJobRes, TReduceRes> task, + public Future<TReduceRes> Execute<TArg, TJobRes, TReduceRes>(IComputeTask<TArg, TJobRes, TReduceRes> task, TArg taskArg) { IgniteArgumentCheck.NotNull(task, "task"); @@ -204,7 +204,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// <param name="taskType">Task type.</param> /// <param name="taskArg">Optional task argument.</param> /// <returns>Task result.</returns> - public IFuture<TReduceRes> Execute<TArg, TJobRes, TReduceRes>(Type taskType, TArg taskArg) + public Future<TReduceRes> Execute<TArg, TJobRes, TReduceRes>(Type taskType, TArg taskArg) { IgniteArgumentCheck.NotNull(taskType, "taskType"); @@ -224,7 +224,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// </summary> /// <param name="clo">Job to execute.</param> /// <returns>Job result for this execution.</returns> - public IFuture<TJobRes> Execute<TJobRes>(IComputeFunc<TJobRes> clo) + public Future<TJobRes> Execute<TJobRes>(IComputeFunc<TJobRes> clo) { IgniteArgumentCheck.NotNull(clo, "clo"); @@ -238,7 +238,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// </summary> /// <param name="func">Func to execute.</param> /// <returns>Job result for this execution.</returns> - public IFuture<TJobRes> Execute<TJobRes>(Func<TJobRes> func) + public Future<TJobRes> Execute<TJobRes>(Func<TJobRes> func) { IgniteArgumentCheck.NotNull(func, "func"); @@ -253,7 +253,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// </summary> /// <param name="clos">Collection of jobs to execute.</param> /// <returns>Collection of job results for this execution.</returns> - public IFuture<ICollection<TJobRes>> Execute<TJobRes>(IEnumerable<IComputeFunc<TJobRes>> clos) + public Future<ICollection<TJobRes>> Execute<TJobRes>(IEnumerable<IComputeFunc<TJobRes>> clos) { IgniteArgumentCheck.NotNull(clos, "clos"); @@ -272,7 +272,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// <param name="clos">Collection of jobs to execute.</param> /// <param name="rdc">Reducer to reduce all job results into one individual return value.</param> /// <returns>Collection of job results for this execution.</returns> - public IFuture<TReduceRes> Execute<TJobRes, TReduceRes>(IEnumerable<IComputeFunc<TJobRes>> clos, + public Future<TReduceRes> Execute<TJobRes, TReduceRes>(IEnumerable<IComputeFunc<TJobRes>> clos, IComputeReducer<TJobRes, TReduceRes> rdc) { IgniteArgumentCheck.NotNull(clos, "clos"); @@ -290,7 +290,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// </summary> /// <param name="clo">Job to broadcast to all projection nodes.</param> /// <returns>Collection of results for this execution.</returns> - public IFuture<ICollection<TJobRes>> Broadcast<TJobRes>(IComputeFunc<TJobRes> clo) + public Future<ICollection<TJobRes>> Broadcast<TJobRes>(IComputeFunc<TJobRes> clo) { IgniteArgumentCheck.NotNull(clo, "clo"); @@ -305,7 +305,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// <param name="clo">Job to broadcast to all projection nodes.</param> /// <param name="arg">Job closure argument.</param> /// <returns>Collection of results for this execution.</returns> - public IFuture<ICollection<TJobRes>> Broadcast<TArg, TJobRes>(IComputeFunc<TArg, TJobRes> clo, TArg arg) + public Future<ICollection<TJobRes>> Broadcast<TArg, TJobRes>(IComputeFunc<TArg, TJobRes> clo, TArg arg) { IgniteArgumentCheck.NotNull(clo, "clo"); @@ -317,7 +317,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// Broadcasts given job to all nodes in grid projection. /// </summary> /// <param name="action">Job to broadcast to all projection nodes.</param> - public IFuture<object> Broadcast(IComputeAction action) + public Future<object> Broadcast(IComputeAction action) { IgniteArgumentCheck.NotNull(action, "action"); @@ -329,7 +329,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// Executes provided job on a node in this grid projection. /// </summary> /// <param name="action">Job to execute.</param> - public IFuture<object> Run(IComputeAction action) + public Future<object> Run(IComputeAction action) { IgniteArgumentCheck.NotNull(action, "action"); @@ -341,7 +341,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// Executes collection of jobs on Ignite nodes within this grid projection. /// </summary> /// <param name="actions">Jobs to execute.</param> - public IFuture<object> Run(IEnumerable<IComputeAction> actions) + public Future<object> Run(IEnumerable<IComputeAction> actions) { IgniteArgumentCheck.NotNull(actions, "actions"); @@ -369,7 +369,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// <param name="clo">Job to run.</param> /// <param name="arg">Job argument.</param> /// <returns>Job result for this execution.</returns> - public IFuture<TJobRes> Apply<TArg, TJobRes>(IComputeFunc<TArg, TJobRes> clo, TArg arg) + public Future<TJobRes> Apply<TArg, TJobRes>(IComputeFunc<TArg, TJobRes> clo, TArg arg) { IgniteArgumentCheck.NotNull(clo, "clo"); @@ -385,7 +385,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// <param name="clo">Job to run.</param> /// <param name="args">Job arguments.</param> /// <returns>Collection of job results.</returns> - public IFuture<ICollection<TJobRes>> Apply<TArg, TJobRes>(IComputeFunc<TArg, TJobRes> clo, + public Future<ICollection<TJobRes>> Apply<TArg, TJobRes>(IComputeFunc<TArg, TJobRes> clo, IEnumerable<TArg> args) { IgniteArgumentCheck.NotNull(clo, "clo"); @@ -413,7 +413,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// <param name="args">Job arguments.</param> /// <param name="rdc">Reducer to reduce all job results into one individual return value.</param> /// <returns>Reduced job result for this execution.</returns> - public IFuture<TReduceRes> Apply<TArg, TJobRes, TReduceRes>(IComputeFunc<TArg, TJobRes> clo, + public Future<TReduceRes> Apply<TArg, TJobRes, TReduceRes>(IComputeFunc<TArg, TJobRes> clo, IEnumerable<TArg> args, IComputeReducer<TJobRes, TReduceRes> rdc) { IgniteArgumentCheck.NotNull(clo, "clo"); @@ -440,7 +440,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// <param name="cacheName">Name of the cache to use for affinity co-location.</param> /// <param name="affinityKey">Affinity key.</param> /// <param name="action">Job to execute.</param> - public IFuture AffinityRun(string cacheName, object affinityKey, IComputeAction action) + public Future<object> AffinityRun(string cacheName, object affinityKey, IComputeAction action) { IgniteArgumentCheck.NotNull(action, "action"); @@ -458,7 +458,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// <param name="clo">Job to execute.</param> /// <returns>Job result for this execution.</returns> /// <typeparam name="TJobRes">Type of job result.</typeparam> - public IFuture<TJobRes> AffinityCall<TJobRes>(string cacheName, object affinityKey, IComputeFunc<TJobRes> clo) + public Future<TJobRes> AffinityCall<TJobRes>(string cacheName, object affinityKey, IComputeFunc<TJobRes> clo) { IgniteArgumentCheck.NotNull(clo, "clo"); @@ -483,7 +483,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// <param name="jobs">Jobs.</param> /// <param name="broadcast">Broadcast flag.</param> /// <returns>Future.</returns> - private IFuture<TReduceRes> ExecuteClosures0<TArg, TJobRes, TReduceRes>( + private Future<TReduceRes> ExecuteClosures0<TArg, TJobRes, TReduceRes>( IComputeTask<TArg, TJobRes, TReduceRes> task, IComputeJob job, ICollection<IComputeJob> jobs, bool broadcast) { @@ -503,7 +503,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// <returns>Future.</returns> [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "User code can throw any exception")] - private IFuture<TReduceRes> ExecuteClosures0<TArg, TJobRes, TReduceRes>( + private Future<TReduceRes> ExecuteClosures0<TArg, TJobRes, TReduceRes>( IComputeTask<TArg, TJobRes, TReduceRes> task, IComputeJob job = null, IEnumerable<IComputeJob> jobs = null, int opId = OpUnicast, int jobsCount = 0, Action<PortableWriterImpl> writeAction = null) http://git-wip-us.apache.org/repos/asf/ignite/blob/cc1aa533/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs index ef27889..1cd13a8 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs @@ -379,7 +379,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// <summary> /// Task completion future. /// </summary> - internal IFuture<TR> Future + internal Future<TR> Future { get { return _fut; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/cc1aa533/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs index 49cbc5a..576c805 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs @@ -22,7 +22,7 @@ namespace Apache.Ignite.Core.Impl.Datastream using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.Threading; - using Apache.Ignite.Core.Common; + using System.Threading.Tasks; using Apache.Ignite.Core.Impl.Common; using Apache.Ignite.Core.Impl.Portable; @@ -69,15 +69,15 @@ namespace Apache.Ignite.Core.Impl.Datastream if (prev != null) Thread.MemoryBarrier(); // Prevent "prev" field escape. - _fut.Listen(() => ParentsCompleted()); + _fut.Task.ContinueWith(x => ParentsCompleted()); } /// <summary> - /// Gets the future. + /// Gets the task. /// </summary> - public IFuture Future + public Task Task { - get { return _fut; } + get { return _fut.Task; } } /// <summary> @@ -264,7 +264,7 @@ namespace Apache.Ignite.Core.Impl.Datastream return false; } - return _fut.IsDone; + return _fut.Task.IsCompleted; } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/cc1aa533/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs index 9894e93..586d19f 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs @@ -21,7 +21,7 @@ namespace Apache.Ignite.Core.Impl.Datastream using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.Threading; - using Apache.Ignite.Core.Common; + using System.Threading.Tasks; using Apache.Ignite.Core.Datastream; using Apache.Ignite.Core.Impl.Common; using Apache.Ignite.Core.Impl.Portable; @@ -326,13 +326,13 @@ namespace Apache.Ignite.Core.Impl.Datastream } /** <inheritDoc /> */ - public IFuture Future + public Task Task { get { ThrowIfDisposed(); - return _closeFut; + return _closeFut.Task; } } @@ -396,7 +396,7 @@ namespace Apache.Ignite.Core.Impl.Datastream } /** <inheritDoc /> */ - public IFuture AddData(TK key, TV val) + public Task AddData(TK key, TV val) { ThrowIfDisposed(); @@ -406,7 +406,7 @@ namespace Apache.Ignite.Core.Impl.Datastream } /** <inheritDoc /> */ - public IFuture AddData(KeyValuePair<TK, TV> pair) + public Task AddData(KeyValuePair<TK, TV> pair) { ThrowIfDisposed(); @@ -414,7 +414,7 @@ namespace Apache.Ignite.Core.Impl.Datastream } /** <inheritDoc /> */ - public IFuture AddData(ICollection<KeyValuePair<TK, TV>> entries) + public Task AddData(ICollection<KeyValuePair<TK, TV>> entries) { ThrowIfDisposed(); @@ -424,7 +424,7 @@ namespace Apache.Ignite.Core.Impl.Datastream } /** <inheritDoc /> */ - public IFuture RemoveData(TK key) + public Task RemoveData(TK key) { ThrowIfDisposed(); @@ -585,7 +585,7 @@ namespace Apache.Ignite.Core.Impl.Datastream /// <param name="val">Value.</param> /// <param name="cnt">Items count.</param> /// <returns>Future.</returns> - private IFuture Add0(object val, int cnt) + private Task Add0(object val, int cnt) { int bufSndSize0 = _bufSndSize; @@ -610,7 +610,7 @@ namespace Apache.Ignite.Core.Impl.Datastream // Batch is too big, schedule flush. Flush0(batch0, false, PlcContinue); - return batch0.Future; + return batch0.Task; } }
