http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs index 5d1add640..bc1b4bb 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs @@ -53,10 +53,11 @@ namespace Apache.Ignite.Core.Impl.Events EnableLocal = 8, DisableLocal = 9, GetEnabledEvents = 10, - WithAsync = 11, IsEnabled = 12, LocalListen = 13, - StopLocalListen = 14 + StopLocalListen = 14, + RemoteQueryAsync = 15, + WaitForLocalAsync = 16 } /** Map from user func to local wrapper, needed for invoke/unsubscribe. */ @@ -66,9 +67,6 @@ namespace Apache.Ignite.Core.Impl.Events /** Cluster group. */ private readonly IClusterGroup _clusterGroup; - /** Async instance. */ - private readonly Lazy<Events> _asyncInstance; - /// <summary> /// Initializes a new instance of the <see cref="Events" /> class. /// </summary> @@ -81,17 +79,6 @@ namespace Apache.Ignite.Core.Impl.Events Debug.Assert(clusterGroup != null); _clusterGroup = clusterGroup; - - _asyncInstance = new Lazy<Events>(() => new Events(this)); - } - - /// <summary> - /// Initializes a new async instance. - /// </summary> - /// <param name="events">The events.</param> - private Events(Events events) : base(UU.TargetOutObject(events.Target, (int) Op.WithAsync), events.Marshaller) - { - _clusterGroup = events.ClusterGroup; } /** <inheritDoc /> */ @@ -106,14 +93,6 @@ namespace Apache.Ignite.Core.Impl.Events get { return (Ignite) ClusterGroup.Ignite; } } - /// <summary> - /// Gets the asynchronous instance. - /// </summary> - private Events AsyncInstance - { - get { return _asyncInstance.Value; } - } - /** <inheritDoc /> */ public ICollection<T> RemoteQuery<T>(IEventFilter<T> filter, TimeSpan? timeout = null, params int[] types) where T : IEvent @@ -121,14 +100,7 @@ namespace Apache.Ignite.Core.Impl.Events IgniteArgumentCheck.NotNull(filter, "filter"); return DoOutInOp((int) Op.RemoteQuery, - writer => - { - writer.Write(filter); - - writer.WriteLong((long) (timeout == null ? 0 : timeout.Value.TotalMilliseconds)); - - WriteEventTypes(types, writer); - }, + writer => WriteRemoteQuery(filter, timeout, types, writer), reader => ReadEvents<T>(reader)); } @@ -136,11 +108,11 @@ namespace Apache.Ignite.Core.Impl.Events public Task<ICollection<T>> RemoteQueryAsync<T>(IEventFilter<T> filter, TimeSpan? timeout = null, params int[] types) where T : IEvent { - AsyncInstance.RemoteQuery(filter, timeout, types); + IgniteArgumentCheck.NotNull(filter, "filter"); // ReSharper disable once RedundantTypeArgumentsOfMethod (won't compile in VS2010) - return GetFuture<ICollection<T>>((futId, futTyp) => UU.TargetListenFutureForOperation(AsyncInstance.Target, - futId, futTyp, (int) Op.RemoteQuery), convertFunc: ReadEvents<T>).Task; + return DoOutOpAsync<ICollection<T>>((int) Op.RemoteQueryAsync, + w => WriteRemoteQuery(filter, timeout, types, w), convertFunc: ReadEvents<T>); } /** <inheritDoc /> */ @@ -234,46 +206,53 @@ namespace Apache.Ignite.Core.Impl.Events /** <inheritDoc /> */ public T WaitForLocal<T>(IEventFilter<T> filter, params int[] types) where T : IEvent { - long hnd = 0; + var hnd = GetFilterHandle(filter); try { - return WaitForLocal0(filter, ref hnd, types); + return DoOutInOp((int) Op.WaitForLocal, + writer => + { + writer.WriteObject(hnd); + WriteEventTypes(types, writer); + }, + reader => EventReader.Read<T>(Marshaller.StartUnmarshal(reader))); } finally { - if (filter != null) - Ignite.HandleRegistry.Release(hnd); + if (hnd != null) + Ignite.HandleRegistry.Release(hnd.Value); } } /** <inheritDoc /> */ public Task<T> WaitForLocalAsync<T>(IEventFilter<T> filter, params int[] types) where T : IEvent { - long hnd = 0; + var hnd = GetFilterHandle(filter); try { - AsyncInstance.WaitForLocal0(filter, ref hnd, types); - - // ReSharper disable once RedundantTypeArgumentsOfMethod (won't compile in VS2010) - var fut = GetFuture<T>((futId, futTyp) => UU.TargetListenFutureForOperation(AsyncInstance.Target, futId, - futTyp, (int) Op.WaitForLocal), convertFunc: reader => (T) EventReader.Read<IEvent>(reader)); + var task = DoOutOpAsync((int) Op.WaitForLocalAsync, writer => + { + writer.WriteObject(hnd); + WriteEventTypes(types, writer); + }, convertFunc: EventReader.Read<T>); - if (filter != null) + if (hnd != null) { // Dispose handle as soon as future ends. - fut.Task.ContinueWith(x => Ignite.HandleRegistry.Release(hnd)); + task.ContinueWith(x => Ignite.HandleRegistry.Release(hnd.Value)); } - return fut.Task; + return task; } catch (Exception) { - Ignite.HandleRegistry.Release(hnd); + if (hnd != null) + Ignite.HandleRegistry.Release(hnd.Value); + throw; } - } /** <inheritDoc /> */ @@ -392,38 +371,13 @@ namespace Apache.Ignite.Core.Impl.Events } /// <summary> - /// Waits for the specified events. + /// Gets the filter handle. /// </summary> - /// <typeparam name="T">Type of events.</typeparam> - /// <param name="filter">Optional filtering predicate. Event wait will end as soon as it returns false.</param> - /// <param name="handle">The filter handle, if applicable.</param> - /// <param name="types">Types of the events to wait for. - /// If not provided, all events will be passed to the filter.</param> - /// <returns>Ignite event.</returns> - private T WaitForLocal0<T>(IEventFilter<T> filter, ref long handle, params int[] types) where T : IEvent - { - if (filter != null) - handle = Ignite.HandleRegistry.Allocate(new LocalEventFilter - { - InvokeFunc = stream => InvokeLocalFilter(stream, filter) - }); - - var hnd = handle; - - return DoOutInOp((int)Op.WaitForLocal, - writer => - { - if (filter != null) - { - writer.WriteBoolean(true); - writer.WriteLong(hnd); - } - else - writer.WriteBoolean(false); - - WriteEventTypes(types, writer); - }, - reader => EventReader.Read<T>(Marshaller.StartUnmarshal(reader))); + private long? GetFilterHandle<T>(IEventFilter<T> filter) where T : IEvent + { + return filter != null + ? Ignite.HandleRegistry.Allocate(new LocalEventFilter<T>(Marshaller, filter)) + : (long?) null; } /// <summary> @@ -559,20 +513,6 @@ namespace Apache.Ignite.Core.Impl.Events /// <param name="stream">The stream.</param> /// <param name="listener">The listener.</param> /// <returns>Filter invocation result.</returns> - private bool InvokeLocalFilter<T>(IBinaryStream stream, IEventFilter<T> listener) where T : IEvent - { - var evt = EventReader.Read<T>(Marshaller.StartUnmarshal(stream)); - - return listener.Invoke(evt); - } - - /// <summary> - /// Invokes local filter using data from specified stream. - /// </summary> - /// <typeparam name="T">Event object type.</typeparam> - /// <param name="stream">The stream.</param> - /// <param name="listener">The listener.</param> - /// <returns>Filter invocation result.</returns> private bool InvokeLocalListener<T>(IBinaryStream stream, IEventListener<T> listener) where T : IEvent { var evt = EventReader.Read<T>(Marshaller.StartUnmarshal(stream)); @@ -614,17 +554,49 @@ namespace Apache.Ignite.Core.Impl.Events } /// <summary> + /// Writes the remote query. + /// </summary> + /// <param name="filter">The filter.</param> + /// <param name="timeout">The timeout.</param> + /// <param name="types">The types.</param> + /// <param name="writer">The writer.</param> + private static void WriteRemoteQuery<T>(IEventFilter<T> filter, TimeSpan? timeout, int[] types, + IBinaryRawWriter writer) + where T : IEvent + { + writer.WriteObject(filter); + + writer.WriteLong((long)(timeout == null ? 0 : timeout.Value.TotalMilliseconds)); + + WriteEventTypes(types, writer); + } + + /// <summary> /// Local user filter wrapper. /// </summary> - private class LocalEventFilter : IInteropCallback + private class LocalEventFilter<T> : IInteropCallback where T : IEvent { /** */ - public Func<IBinaryStream, bool> InvokeFunc; + private readonly Marshaller _marshaller; + + /** */ + private readonly IEventFilter<T> _listener; + + /// <summary> + /// Initializes a new instance of the <see cref="LocalEventFilter{T}"/> class. + /// </summary> + public LocalEventFilter(Marshaller marshaller, IEventFilter<T> listener) + { + _marshaller = marshaller; + _listener = listener; + } /** <inheritdoc /> */ public int Invoke(IBinaryStream stream) { - return InvokeFunc(stream) ? 1 : 0; + var evt = EventReader.Read<T>(_marshaller.StartUnmarshal(stream)); + + return _listener.Invoke(evt) ? 1 : 0; } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs index a8e3075..79df470 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs @@ -485,7 +485,7 @@ namespace Apache.Ignite.Core.Impl /// </returns> public ICache<TK, TV> Cache<TK, TV>(IUnmanagedTarget nativeCache, bool keepBinary = false) { - return new CacheImpl<TK, TV>(this, nativeCache, _marsh, false, keepBinary, false, false); + return new CacheImpl<TK, TV>(this, nativeCache, _marsh, false, keepBinary, false); } /** <inheritdoc /> */ http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs index 2216d1a..1b43438 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs @@ -23,6 +23,7 @@ namespace Apache.Ignite.Core.Impl.Messaging using System.Diagnostics; using System.Linq; using System.Threading.Tasks; + using Apache.Ignite.Core.Binary; using Apache.Ignite.Core.Cluster; using Apache.Ignite.Core.Impl.Binary; using Apache.Ignite.Core.Impl.Collections; @@ -30,7 +31,6 @@ namespace Apache.Ignite.Core.Impl.Messaging using Apache.Ignite.Core.Impl.Resource; using Apache.Ignite.Core.Impl.Unmanaged; using Apache.Ignite.Core.Messaging; - using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; /// <summary> /// Messaging functionality. @@ -49,7 +49,8 @@ namespace Apache.Ignite.Core.Impl.Messaging SendOrdered = 5, StopLocalListen = 6, StopRemoteListen = 7, - WithAsync = 8 + RemoteListenAsync = 9, + StopRemoteListenAsync = 10 } /** Map from user (func+topic) -> id, needed for unsubscription. */ @@ -59,12 +60,6 @@ namespace Apache.Ignite.Core.Impl.Messaging /** Grid */ private readonly Ignite _ignite; - /** Async instance. */ - private readonly Lazy<Messaging> _asyncInstance; - - /** Async flag. */ - private readonly bool _isAsync; - /** Cluster group. */ private readonly IClusterGroup _clusterGroup; @@ -82,20 +77,6 @@ namespace Apache.Ignite.Core.Impl.Messaging _clusterGroup = prj; _ignite = (Ignite) prj.Ignite; - - _asyncInstance = new Lazy<Messaging>(() => new Messaging(this)); - } - - /// <summary> - /// Initializes a new async instance. - /// </summary> - /// <param name="messaging">The messaging.</param> - private Messaging(Messaging messaging) : base( - UU.TargetOutObject(messaging.Target, (int) Op.WithAsync), messaging.Marshaller) - { - _isAsync = true; - _ignite = messaging._ignite; - _clusterGroup = messaging.ClusterGroup; } /** <inheritdoc /> */ @@ -104,16 +85,7 @@ namespace Apache.Ignite.Core.Impl.Messaging get { return _clusterGroup; } } - /// <summary> - /// Gets the asynchronous instance. - /// </summary> - private Messaging AsyncInstance - { - get { return _asyncInstance.Value; } - } - /** <inheritdoc /> */ - public void Send(object message, object topic = null) { IgniteArgumentCheck.NotNull(message, "message"); @@ -216,65 +188,28 @@ namespace Apache.Ignite.Core.Impl.Messaging /** <inheritdoc /> */ public Guid RemoteListen<T>(IMessageListener<T> listener, object topic = null) { - IgniteArgumentCheck.NotNull(listener, "filter"); - - var filter0 = MessageListenerHolder.CreateLocal(_ignite, listener); - var filterHnd = _ignite.HandleRegistry.AllocateSafe(filter0); - - try - { - Guid id = Guid.Empty; - - DoOutInOp((int) Op.RemoteListen, - writer => - { - writer.Write(filter0); - writer.WriteLong(filterHnd); - writer.Write(topic); - }, - input => - { - var id0 = Marshaller.StartUnmarshal(input).GetRawReader().ReadGuid(); - - Debug.Assert(_isAsync || id0.HasValue); - - if (id0.HasValue) - id = id0.Value; - }); - - return id; - } - catch (Exception) - { - _ignite.HandleRegistry.Release(filterHnd); - - throw; - } + return RemoteListen(listener, topic, + (writeAct, readAct) => DoOutInOp((int) Op.RemoteListen, writeAct, + stream => readAct(Marshaller.StartUnmarshal(stream)))); } /** <inheritdoc /> */ public Task<Guid> RemoteListenAsync<T>(IMessageListener<T> listener, object topic = null) { - AsyncInstance.RemoteListen(listener, topic); - - return AsyncInstance.GetTask<Guid>(); + return RemoteListen(listener, topic, + (writeAct, readAct) => DoOutOpAsync((int) Op.RemoteListenAsync, writeAct, convertFunc: readAct)); } /** <inheritdoc /> */ public void StopRemoteListen(Guid opId) { - DoOutOp((int) Op.StopRemoteListen, writer => - { - writer.WriteGuid(opId); - }); + DoOutOp((int) Op.StopRemoteListen, writer => writer.WriteGuid(opId)); } /** <inheritdoc /> */ public Task StopRemoteListenAsync(Guid opId) { - AsyncInstance.StopRemoteListen(opId); - - return AsyncInstance.GetTask(); + return DoOutOpAsync((int) Op.StopRemoteListenAsync, writer => writer.WriteGuid(opId)); } /// <summary> @@ -287,5 +222,33 @@ namespace Apache.Ignite.Core.Impl.Messaging { return new KeyValuePair<object, object>(filter, topic); } + + /// <summary> + /// Remotes listen. + /// </summary> + private TRes RemoteListen<T, TRes>(IMessageListener<T> filter, object topic, + Func<Action<IBinaryRawWriter>, Func<BinaryReader, Guid>, TRes> invoker) + { + IgniteArgumentCheck.NotNull(filter, "filter"); + + var filter0 = MessageListenerHolder.CreateLocal(_ignite, filter); + var filterHnd = _ignite.HandleRegistry.AllocateSafe(filter0); + + try + { + return invoker(writer => + { + writer.WriteObject(filter0); + writer.WriteLong(filterHnd); + writer.WriteObject(topic); + }, input => input.ReadGuid() ?? Guid.Empty); + } + catch (Exception) + { + _ignite.HandleRegistry.Release(filterHnd); + + throw; + } + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs index f392830..d5b69a4 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs @@ -23,6 +23,7 @@ namespace Apache.Ignite.Core.Impl using System.Diagnostics.CodeAnalysis; using System.IO; using System.Threading.Tasks; + using Apache.Ignite.Core.Binary; using Apache.Ignite.Core.Impl.Binary; using Apache.Ignite.Core.Impl.Binary.IO; using Apache.Ignite.Core.Impl.Binary.Metadata; @@ -757,6 +758,102 @@ namespace Apache.Ignite.Core.Impl #endregion + #region Async operations + + /// <summary> + /// Performs async operation. + /// </summary> + /// <param name="type">The type code.</param> + /// <param name="writeAction">The write action.</param> + /// <returns>Task for async operation</returns> + protected Task DoOutOpAsync(int type, Action<BinaryWriter> writeAction = null) + { + return DoOutOpAsync<object>(type, writeAction); + } + + /// <summary> + /// Performs async operation. + /// </summary> + /// <typeparam name="T">Type of the result.</typeparam> + /// <param name="type">The type code.</param> + /// <param name="writeAction">The write action.</param> + /// <param name="keepBinary">Keep binary flag, only applicable to object futures. False by default.</param> + /// <param name="convertFunc">The function to read future result from stream.</param> + /// <returns>Task for async operation</returns> + protected Task<T> DoOutOpAsync<T>(int type, Action<BinaryWriter> writeAction = null, bool keepBinary = false, + Func<BinaryReader, T> convertFunc = null) + { + return GetFuture((futId, futType) => DoOutOp(type, w => + { + if (writeAction != null) + writeAction(w); + w.WriteLong(futId); + w.WriteInt(futType); + }), keepBinary, convertFunc).Task; + } + + /// <summary> + /// Performs async operation. + /// </summary> + /// <typeparam name="T">Type of the result.</typeparam> + /// <param name="type">The type code.</param> + /// <param name="writeAction">The write action.</param> + /// <returns>Future for async operation</returns> + protected Future<T> DoOutOpObjectAsync<T>(int type, Action<IBinaryRawWriter> writeAction) + { + return GetFuture<T>((futId, futType) => DoOutOpObject(type, w => + { + writeAction(w); + w.WriteLong(futId); + w.WriteInt(futType); + })); + } + + /// <summary> + /// Performs async operation. + /// </summary> + /// <typeparam name="TR">Type of the result.</typeparam> + /// <typeparam name="T1">The type of the first arg.</typeparam> + /// <param name="type">The type code.</param> + /// <param name="val1">First arg.</param> + /// <returns> + /// Task for async operation + /// </returns> + protected Task<TR> DoOutOpAsync<T1, TR>(int type, T1 val1) + { + return GetFuture<TR>((futId, futType) => DoOutOp(type, w => + { + w.WriteObject(val1); + w.WriteLong(futId); + w.WriteInt(futType); + })).Task; + } + + /// <summary> + /// Performs async operation. + /// </summary> + /// <typeparam name="TR">Type of the result.</typeparam> + /// <typeparam name="T1">The type of the first arg.</typeparam> + /// <typeparam name="T2">The type of the second arg.</typeparam> + /// <param name="type">The type code.</param> + /// <param name="val1">First arg.</param> + /// <param name="val2">Second arg.</param> + /// <returns> + /// Task for async operation + /// </returns> + protected Task<TR> DoOutOpAsync<T1, T2, TR>(int type, T1 val1, T2 val2) + { + return GetFuture<TR>((futId, futType) => DoOutOp(type, w => + { + w.WriteObject(val1); + w.WriteObject(val2); + w.WriteLong(futId); + w.WriteInt(futType); + })).Task; + } + + #endregion + #region Miscelanneous /// <summary> @@ -846,7 +943,7 @@ namespace Apache.Ignite.Core.Impl /// <param name="keepBinary">Keep binary flag, only applicable to object futures. False by default.</param> /// <param name="convertFunc">The function to read future result from stream.</param> /// <returns>Created future.</returns> - protected Future<T> GetFuture<T>(Func<long, int, IUnmanagedTarget> listenAction, bool keepBinary = false, + private Future<T> GetFuture<T>(Func<long, int, IUnmanagedTarget> listenAction, bool keepBinary = false, Func<BinaryReader, T> convertFunc = null) { var futType = FutureType.Object; @@ -862,7 +959,18 @@ namespace Apache.Ignite.Core.Impl var futHnd = _marsh.Ignite.HandleRegistry.Allocate(fut); - var futTarget = listenAction(futHnd, (int) futType); + IUnmanagedTarget futTarget; + + try + { + futTarget = listenAction(futHnd, (int)futType); + } + catch (Exception) + { + _marsh.Ignite.HandleRegistry.Release(futHnd); + + throw; + } fut.SetTarget(futTarget); @@ -893,25 +1001,18 @@ namespace Apache.Ignite.Core.Impl var futHnd = _marsh.Ignite.HandleRegistry.Allocate(fut); - listenAction(futHnd, (int)futType); - - return fut; - } + try + { + listenAction(futHnd, (int)futType); + } + catch (Exception) + { + _marsh.Ignite.HandleRegistry.Release(futHnd); - /// <summary> - /// Creates a task to listen for the last async op. - /// </summary> - protected Task GetTask() - { - return GetTask<object>(); - } + throw; + } - /// <summary> - /// Creates a task to listen for the last async op. - /// </summary> - protected Task<T> GetTask<T>() - { - return GetFuture<T>((futId, futTyp) => UU.TargetListenFuture(Target, futId, futTyp)).Task; + return fut; } #endregion http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs index 8fc973b..9d9acfd 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs @@ -17,12 +17,12 @@ namespace Apache.Ignite.Core.Impl.Services { - using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Reflection; using System.Threading.Tasks; + using Apache.Ignite.Core.Binary; using Apache.Ignite.Core.Cluster; using Apache.Ignite.Core.Impl.Binary; using Apache.Ignite.Core.Impl.Common; @@ -51,9 +51,6 @@ namespace Apache.Ignite.Core.Impl.Services private const int OpDescriptors = 5; /** */ - private const int OpWithAsync = 6; - - /** */ private const int OpWithServerKeepBinary = 7; /** */ @@ -66,6 +63,18 @@ namespace Apache.Ignite.Core.Impl.Services private const int OpCancelAll = 10; /** */ + private const int OpDeployAsync = 11; + + /** */ + private const int OpDeployMultipleAsync = 12; + + /** */ + private const int OpCancelAsync = 13; + + /** */ + private const int OpCancelAllAsync = 14; + + /** */ private readonly IClusterGroup _clusterGroup; /** Invoker binary flag. */ @@ -74,9 +83,6 @@ namespace Apache.Ignite.Core.Impl.Services /** Server binary flag. */ private readonly bool _srvKeepBinary; - /** Async instance. */ - private readonly Lazy<Services> _asyncInstance; - /// <summary> /// Initializes a new instance of the <see cref="Services" /> class. /// </summary> @@ -94,20 +100,6 @@ namespace Apache.Ignite.Core.Impl.Services _clusterGroup = clusterGroup; _keepBinary = keepBinary; _srvKeepBinary = srvKeepBinary; - - _asyncInstance = new Lazy<Services>(() => new Services(this)); - } - - /// <summary> - /// Initializes a new async instance. - /// </summary> - /// <param name="services">The services.</param> - private Services(Services services) : base(UU.TargetOutObject(services.Target, OpWithAsync), - services.Marshaller) - { - _clusterGroup = services.ClusterGroup; - _keepBinary = services._keepBinary; - _srvKeepBinary = services._srvKeepBinary; } /** <inheritDoc /> */ @@ -134,14 +126,6 @@ namespace Apache.Ignite.Core.Impl.Services get { return _clusterGroup; } } - /// <summary> - /// Gets the asynchronous instance. - /// </summary> - private Services AsyncInstance - { - get { return _asyncInstance.Value; } - } - /** <inheritDoc /> */ public void DeployClusterSingleton(string name, IService service) { @@ -154,9 +138,10 @@ namespace Apache.Ignite.Core.Impl.Services /** <inheritDoc /> */ public Task DeployClusterSingletonAsync(string name, IService service) { - AsyncInstance.DeployClusterSingleton(name, service); + IgniteArgumentCheck.NotNullOrEmpty(name, "name"); + IgniteArgumentCheck.NotNull(service, "service"); - return AsyncInstance.GetTask(); + return DeployMultipleAsync(name, service, 1, 1); } /** <inheritDoc /> */ @@ -171,9 +156,10 @@ namespace Apache.Ignite.Core.Impl.Services /** <inheritDoc /> */ public Task DeployNodeSingletonAsync(string name, IService service) { - AsyncInstance.DeployNodeSingleton(name, service); + IgniteArgumentCheck.NotNullOrEmpty(name, "name"); + IgniteArgumentCheck.NotNull(service, "service"); - return AsyncInstance.GetTask(); + return DeployMultipleAsync(name, service, 0, 1); } /** <inheritDoc /> */ @@ -197,9 +183,19 @@ namespace Apache.Ignite.Core.Impl.Services /** <inheritDoc /> */ public Task DeployKeyAffinitySingletonAsync<TK>(string name, IService service, string cacheName, TK affinityKey) { - AsyncInstance.DeployKeyAffinitySingleton(name, service, cacheName, affinityKey); + IgniteArgumentCheck.NotNullOrEmpty(name, "name"); + IgniteArgumentCheck.NotNull(service, "service"); + IgniteArgumentCheck.NotNull(affinityKey, "affinityKey"); - return AsyncInstance.GetTask(); + return DeployAsync(new ServiceConfiguration + { + Name = name, + Service = service, + CacheName = cacheName, + AffinityKey = affinityKey, + TotalCount = 1, + MaxPerNodeCount = 1 + }); } /** <inheritDoc /> */ @@ -220,9 +216,16 @@ namespace Apache.Ignite.Core.Impl.Services /** <inheritDoc /> */ public Task DeployMultipleAsync(string name, IService service, int totalCount, int maxPerNodeCount) { - AsyncInstance.DeployMultiple(name, service, totalCount, maxPerNodeCount); + IgniteArgumentCheck.NotNullOrEmpty(name, "name"); + IgniteArgumentCheck.NotNull(service, "service"); - return AsyncInstance.GetTask(); + return DoOutOpAsync(OpDeployMultipleAsync, w => + { + w.WriteString(name); + w.WriteObject(service); + w.WriteInt(totalCount); + w.WriteInt(maxPerNodeCount); + }); } /** <inheritDoc /> */ @@ -230,28 +233,15 @@ namespace Apache.Ignite.Core.Impl.Services { IgniteArgumentCheck.NotNull(configuration, "configuration"); - DoOutOp(OpDeploy, w => - { - w.WriteString(configuration.Name); - w.WriteObject(configuration.Service); - w.WriteInt(configuration.TotalCount); - w.WriteInt(configuration.MaxPerNodeCount); - w.WriteString(configuration.CacheName); - w.WriteObject(configuration.AffinityKey); - - if (configuration.NodeFilter != null) - w.WriteObject(configuration.NodeFilter); - else - w.WriteObject<object>(null); - }); + DoOutOp(OpDeploy, w => WriteServiceConfiguration(configuration, w)); } /** <inheritDoc /> */ public Task DeployAsync(ServiceConfiguration configuration) { - AsyncInstance.Deploy(configuration); + IgniteArgumentCheck.NotNull(configuration, "configuration"); - return AsyncInstance.GetTask(); + return DoOutOpAsync(OpDeployAsync, w => WriteServiceConfiguration(configuration, w)); } /** <inheritDoc /> */ @@ -265,9 +255,9 @@ namespace Apache.Ignite.Core.Impl.Services /** <inheritDoc /> */ public Task CancelAsync(string name) { - AsyncInstance.Cancel(name); + IgniteArgumentCheck.NotNullOrEmpty(name, "name"); - return AsyncInstance.GetTask(); + return DoOutOpAsync(OpCancelAsync, w => w.WriteString(name)); } /** <inheritDoc /> */ @@ -279,9 +269,7 @@ namespace Apache.Ignite.Core.Impl.Services /** <inheritDoc /> */ public Task CancelAllAsync() { - AsyncInstance.CancelAll(); - - return AsyncInstance.GetTask(); + return DoOutOpAsync(OpCancelAllAsync); } /** <inheritDoc /> */ @@ -391,5 +379,26 @@ namespace Apache.Ignite.Core.Impl.Services writer => ServiceProxySerializer.WriteProxyMethod(writer, method, args, platform), stream => ServiceProxySerializer.ReadInvocationResult(stream, Marshaller, _keepBinary), proxy.Target); } + + /// <summary> + /// Writes the service configuration. + /// </summary> + private static void WriteServiceConfiguration(ServiceConfiguration configuration, IBinaryRawWriter w) + { + Debug.Assert(configuration != null); + Debug.Assert(w != null); + + w.WriteString(configuration.Name); + w.WriteObject(configuration.Service); + w.WriteInt(configuration.TotalCount); + w.WriteInt(configuration.MaxPerNodeCount); + w.WriteString(configuration.CacheName); + w.WriteObject(configuration.AffinityKey); + + if (configuration.NodeFilter != null) + w.WriteObject(configuration.NodeFilter); + else + w.WriteObject<object>(null); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs index 796044d..7de9be1 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs @@ -22,7 +22,6 @@ namespace Apache.Ignite.Core.Impl.Transactions using System.Threading.Tasks; using Apache.Ignite.Core.Binary; using Apache.Ignite.Core.Impl.Binary; - using Apache.Ignite.Core.Impl.Binary.IO; using Apache.Ignite.Core.Impl.Unmanaged; using Apache.Ignite.Core.Transactions; @@ -214,11 +213,7 @@ namespace Apache.Ignite.Core.Impl.Transactions /// </summary> internal Task CommitAsync(TransactionImpl tx) { - return GetFuture<object>((futId, futTyp) => DoOutOp(OpCommitAsync, (IBinaryStream s) => - { - s.WriteLong(tx.Id); - s.WriteLong(futId); - })).Task; + return DoOutOpAsync(OpCommitAsync, w => w.WriteLong(tx.Id)); } /// <summary> @@ -226,11 +221,7 @@ namespace Apache.Ignite.Core.Impl.Transactions /// </summary> internal Task RollbackAsync(TransactionImpl tx) { - return GetFuture<object>((futId, futTyp) => DoOutOp(OpRollbackAsync, (IBinaryStream s) => - { - s.WriteLong(tx.Id); - s.WriteLong(futId); - })).Task; + return DoOutOpAsync(OpRollbackAsync, w => w.WriteLong(tx.Id)); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs index c352f0c..6b867de 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs @@ -150,18 +150,6 @@ namespace Apache.Ignite.Core.Impl.Unmanaged [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetOutObject")] public static extern void* TargetOutObject(void* ctx, void* target, int opType); - [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetListenFuture")] - public static extern void TargetListenFut(void* ctx, void* target, long futId, int typ); - - [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetListenFutureForOperation")] - public static extern void TargetListenFutForOp(void* ctx, void* target, long futId, int typ, int opId); - - [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetListenFutureAndGet")] - public static extern void* TargetListenFutAndGet(void* ctx, void* target, long futId, int typ); - - [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetListenFutureForOperationAndGet")] - public static extern void* TargetListenFutForOpAndGet(void* ctx, void* target, long futId, int typ, int opId); - [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteAcquire")] public static extern void* Acquire(void* ctx, void* target); http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs index b1d4ecd..36dc332 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs @@ -460,31 +460,6 @@ namespace Apache.Ignite.Core.Impl.Unmanaged return target.ChangeTarget(res); } - internal static void TargetListenFuture(IUnmanagedTarget target, long futId, int typ) - { - JNI.TargetListenFut(target.Context, target.Target, futId, typ); - } - - internal static void TargetListenFutureForOperation(IUnmanagedTarget target, long futId, int typ, int opId) - { - JNI.TargetListenFutForOp(target.Context, target.Target, futId, typ, opId); - } - - internal static IUnmanagedTarget TargetListenFutureAndGet(IUnmanagedTarget target, long futId, int typ) - { - var res = JNI.TargetListenFutAndGet(target.Context, target.Target, futId, typ); - - return target.ChangeTarget(res); - } - - internal static IUnmanagedTarget TargetListenFutureForOperationAndGet(IUnmanagedTarget target, long futId, - int typ, int opId) - { - var res = JNI.TargetListenFutForOpAndGet(target.Context, target.Target, futId, typ, opId); - - return target.ChangeTarget(res); - } - #endregion #region NATIVE METHODS: MISCELANNEOUS http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings b/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings index ac065bc..72ce015 100644 --- a/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings +++ b/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings @@ -1,5 +1,6 @@ <wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation"> <s:String x:Key="/Default/CodeInspection/CSharpLanguageProject/LanguageLevel/@EntryValue">CSharp50</s:String> - <s:Boolean x:Key="/Default/CodeStyle/CSharpUsing/AddImportsToDeepestScope/@EntryValue">True</s:Boolean> + <s:Boolean x:Key="/Default/CodeStyle/CSharpUsing/AddImportsToDeepestScope/@EntryValue">True</s:Boolean> <s:Boolean x:Key="/Default/CodeStyle/CSharpUsing/QualifiedUsingAtNestedScope/@EntryValue">True</s:Boolean> + <s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=ConvertClosureToMethodGroup/@EntryIndexedValue">DO_NOT_SHOW</s:String> </wpf:ResourceDictionary> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.sln.TeamCity.DotSettings ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.sln.TeamCity.DotSettings b/modules/platforms/dotnet/Apache.Ignite.sln.TeamCity.DotSettings index cf9e287..9672abe 100644 --- a/modules/platforms/dotnet/Apache.Ignite.sln.TeamCity.DotSettings +++ b/modules/platforms/dotnet/Apache.Ignite.sln.TeamCity.DotSettings @@ -25,6 +25,7 @@ <s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=StaticMemberInGenericType/@EntryIndexedValue">DO_NOT_SHOW</s:String> <s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=StaticFieldInGenericType/@EntryIndexedValue">DO_NOT_SHOW</s:String> <s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=VirtualMemberNeverOverriden_002EGlobal/@EntryIndexedValue">DO_NOT_SHOW</s:String> + <s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=ConvertClosureToMethodGroup/@EntryIndexedValue">DO_NOT_SHOW</s:String> <s:Boolean x:Key="/Default/CodeStyle/CSharpUsing/AddImportsToDeepestScope/@EntryValue">True</s:Boolean> <s:Boolean x:Key="/Default/CodeStyle/CSharpUsing/QualifiedUsingAtNestedScope/@EntryValue">True</s:Boolean></wpf:ResourceDictionary> \ No newline at end of file
