http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/StreamReceiverHolder.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/StreamReceiverHolder.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/StreamReceiverHolder.cs index c91334d..b717d14 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/StreamReceiverHolder.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/StreamReceiverHolder.cs @@ -27,7 +27,6 @@ namespace Apache.Ignite.Core.Impl.Datastream using Apache.Ignite.Core.Impl.Binary.IO; using Apache.Ignite.Core.Impl.Cache; using Apache.Ignite.Core.Impl.Common; - using Apache.Ignite.Core.Impl.Unmanaged; /// <summary> /// Binary wrapper for <see cref="IStreamReceiver{TK,TV}"/>. @@ -44,7 +43,7 @@ namespace Apache.Ignite.Core.Impl.Datastream private readonly object _rcv; /** Invoker delegate. */ - private readonly Action<object, Ignite, IUnmanagedTarget, IBinaryStream, bool> _invoke; + private readonly Action<object, Ignite, IPlatformTargetInternal, IBinaryStream, bool> _invoke; /// <summary> /// Initializes a new instance of the <see cref="StreamReceiverHolder"/> class. @@ -77,7 +76,7 @@ namespace Apache.Ignite.Core.Impl.Datastream /// <param name="rcv">Receiver.</param> /// <param name="invoke">Invoke delegate.</param> public StreamReceiverHolder(object rcv, - Action<object, Ignite, IUnmanagedTarget, IBinaryStream, bool> invoke) + Action<object, Ignite, IPlatformTargetInternal, IBinaryStream, bool> invoke) { Debug.Assert(rcv != null); Debug.Assert(invoke != null); @@ -109,7 +108,7 @@ namespace Apache.Ignite.Core.Impl.Datastream /// <param name="cache">Cache.</param> /// <param name="stream">Stream.</param> /// <param name="keepBinary">Binary flag.</param> - public void Receive(Ignite grid, IUnmanagedTarget cache, IBinaryStream stream, bool keepBinary) + public void Receive(Ignite grid, IPlatformTargetInternal cache, IBinaryStream stream, bool keepBinary) { Debug.Assert(grid != null); Debug.Assert(cache != null); @@ -126,8 +125,8 @@ namespace Apache.Ignite.Core.Impl.Datastream /// <param name="cache">Cache.</param> /// <param name="stream">Stream.</param> /// <param name="keepBinary">Binary flag.</param> - public static void InvokeReceiver<TK, TV>(IStreamReceiver<TK, TV> receiver, Ignite grid, IUnmanagedTarget cache, - IBinaryStream stream, bool keepBinary) + public static void InvokeReceiver<TK, TV>(IStreamReceiver<TK, TV> receiver, Ignite grid, + IPlatformTargetInternal cache, IBinaryStream stream, bool keepBinary) { var reader = grid.Marshaller.StartUnmarshal(stream, keepBinary); @@ -138,7 +137,7 @@ namespace Apache.Ignite.Core.Impl.Datastream for (var i = 0; i < size; i++) entries.Add(new CacheEntry<TK, TV>(reader.ReadObject<TK>(), reader.ReadObject<TV>())); - receiver.Receive(grid.GetCache<TK, TV>(cache, keepBinary), entries); + receiver.Receive(Ignite.GetCache<TK, TV>(cache, keepBinary), entries); } } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs index eb454d6..3c7363e 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs @@ -30,13 +30,11 @@ namespace Apache.Ignite.Core.Impl.Events using Apache.Ignite.Core.Impl.Binary.IO; using Apache.Ignite.Core.Impl.Common; using Apache.Ignite.Core.Impl.Handle; - using Apache.Ignite.Core.Impl.Unmanaged; - using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; /// <summary> /// Ignite events. /// </summary> - internal sealed class Events : PlatformTarget, IEvents + internal sealed class Events : PlatformTargetAdapter, IEvents { /// <summary> /// Opcodes. @@ -66,15 +64,14 @@ namespace Apache.Ignite.Core.Impl.Events /** Cluster group. */ private readonly IClusterGroup _clusterGroup; - + /// <summary> /// Initializes a new instance of the <see cref="Events" /> class. /// </summary> /// <param name="target">Target.</param> - /// <param name="marsh">Marshaller.</param> /// <param name="clusterGroup">Cluster group.</param> - public Events(IUnmanagedTarget target, Marshaller marsh, IClusterGroup clusterGroup) - : base(target, marsh) + public Events(IPlatformTargetInternal target, IClusterGroup clusterGroup) + : base(target) { Debug.Assert(clusterGroup != null); http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IPlatformTargetInternal.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IPlatformTargetInternal.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IPlatformTargetInternal.cs new file mode 100644 index 0000000..23174b4 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IPlatformTargetInternal.cs @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl +{ + using System; + using Apache.Ignite.Core.Impl.Binary; + using Apache.Ignite.Core.Impl.Binary.IO; + using Apache.Ignite.Core.Interop; + + /// <summary> + /// Extended platform target interface with methods that operate on internal entities (streams and targets). + /// </summary> + internal interface IPlatformTargetInternal : IPlatformTarget, IDisposable + { + /// <summary> + /// Gets the marshaller. + /// </summary> + Marshaller Marshaller { get; } + + /// <summary> + /// Performs InStreamOutLong operation. + /// </summary> + /// <param name="type">Operation type code.</param> + /// <param name="writeAction">Write action.</param> + /// <returns>Result.</returns> + long InStreamOutLong(int type, Action<IBinaryStream> writeAction); + + /// <summary> + /// Performs InStreamOutLong operation with stream reuse. + /// </summary> + /// <param name="type">Operation type code.</param> + /// <param name="writeAction">Write action.</param> + /// <param name="readAction">Read action.</param> + /// <param name="readErrorAction">Error action.</param> + /// <returns> + /// Result. + /// </returns> + T InStreamOutLong<T>(int type, Action<IBinaryStream> writeAction, Func<IBinaryStream, long, T> readAction, + Func<IBinaryStream, Exception> readErrorAction); + + /// <summary> + /// Performs InStreamOutStream operation. + /// </summary> + /// <typeparam name="T">Result type.</typeparam> + /// <param name="type">Operation type code.</param> + /// <param name="writeAction">Write action.</param> + /// <param name="readAction">Read action.</param> + /// <returns>Result.</returns> + T InStreamOutStream<T>(int type, Action<IBinaryStream> writeAction, Func<IBinaryStream, T> readAction); + + /// <summary> + /// Performs InStreamOutObject operation. + /// </summary> + /// <param name="type">Operation type code.</param> + /// <param name="writeAction">Write action.</param> + /// <returns>Result.</returns> + IPlatformTargetInternal InStreamOutObject(int type, Action<IBinaryStream> writeAction); + + /// <summary> + /// Performs InObjectStreamOutObjectStream operation. + /// </summary> + /// <typeparam name="T">Result type.</typeparam> + /// <param name="type">Operation type code.</param> + /// <param name="arg">Target argument.</param> + /// <param name="writeAction">Write action.</param> + /// <param name="readAction">Read action.</param> + /// <returns>Result.</returns> + T InObjectStreamOutObjectStream<T>(int type, Action<IBinaryStream> writeAction, + Func<IBinaryStream, IPlatformTargetInternal, T> readAction, IPlatformTargetInternal arg); + + /// <summary> + /// Performs OutStream operation. + /// </summary> + /// <typeparam name="T">Result type.</typeparam> + /// <param name="type">Operation type code.</param> + /// <param name="readAction">Read action.</param> + /// <returns>Result.</returns> + T OutStream<T>(int type, Func<IBinaryStream, T> readAction); + + /// <summary> + /// Performs the OutObject operation. + /// </summary> + /// <param name="type">Operation type code.</param> + /// <returns>Result.</returns> + IPlatformTargetInternal OutObjectInternal(int type); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs index 715776e..aae6ce7 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs @@ -28,6 +28,7 @@ namespace Apache.Ignite.Core.Impl using Apache.Ignite.Core.Cache; using Apache.Ignite.Core.Cache.Configuration; using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Common; using Apache.Ignite.Core.Compute; using Apache.Ignite.Core.Datastream; using Apache.Ignite.Core.DataStructures; @@ -54,7 +55,7 @@ namespace Apache.Ignite.Core.Impl /// <summary> /// Native Ignite wrapper. /// </summary> - internal class Ignite : PlatformTarget, IIgnite, ICluster + internal class Ignite : PlatformTargetAdapter, IIgnite, ICluster { /// <summary> /// Operation codes for PlatformProcessorImpl calls. @@ -92,7 +93,7 @@ namespace Apache.Ignite.Core.Impl private readonly string _name; /** Unmanaged node. */ - private readonly IUnmanagedTarget _proc; + private readonly IPlatformTargetInternal _proc; /** Marshaller. */ private readonly Marshaller _marsh; @@ -138,8 +139,8 @@ namespace Apache.Ignite.Core.Impl /// <param name="marsh">Marshaller.</param> /// <param name="lifecycleHandlers">Lifecycle beans.</param> /// <param name="cbs">Callbacks.</param> - public Ignite(IgniteConfiguration cfg, string name, IUnmanagedTarget proc, Marshaller marsh, - IList<LifecycleHandlerHolder> lifecycleHandlers, UnmanagedCallbacks cbs) : base(proc, marsh) + public Ignite(IgniteConfiguration cfg, string name, IPlatformTargetInternal proc, Marshaller marsh, + IList<LifecycleHandlerHolder> lifecycleHandlers, UnmanagedCallbacks cbs) : base(proc) { Debug.Assert(cfg != null); Debug.Assert(proc != null); @@ -156,17 +157,17 @@ namespace Apache.Ignite.Core.Impl marsh.Ignite = this; - _prj = new ClusterGroupImpl(DoOutOpObject((int) Op.GetClusterGroup), this, null); + _prj = new ClusterGroupImpl(Target.OutObjectInternal((int) Op.GetClusterGroup), null); _binary = new Binary.Binary(marsh); - _binaryProc = new BinaryProcessor(DoOutOpObject((int) Op.GetBinaryProcessor), marsh); + _binaryProc = new BinaryProcessor(DoOutOpObject((int) Op.GetBinaryProcessor)); cbs.Initialize(this); // Grid is not completely started here, can't initialize interop transactions right away. _transactions = new Lazy<TransactionsImpl>( - () => new TransactionsImpl(DoOutOpObject((int) Op.GetTransactions), marsh, GetLocalNode().Id)); + () => new TransactionsImpl(DoOutOpObject((int) Op.GetTransactions), GetLocalNode().Id)); // Set reconnected task to completed state for convenience. _clientReconnectTaskCompletionSource.SetResult(false); @@ -380,7 +381,14 @@ namespace Apache.Ignite.Core.Impl /// <param name="cancel">Cancel flag.</param> internal unsafe void Stop(bool cancel) { - UU.IgnitionStop(_proc.Context, Name, cancel); + var jniTarget = _proc as PlatformJniTarget; + + if (jniTarget == null) + { + throw new IgniteException("Ignition.Stop is not supported in thin client."); + } + + UU.IgnitionStop(jniTarget.Target.Context, Name, cancel); _cbs.Cleanup(); } @@ -507,9 +515,9 @@ namespace Apache.Ignite.Core.Impl /// <returns> /// New instance of cache wrapping specified native cache. /// </returns> - public ICache<TK, TV> GetCache<TK, TV>(IUnmanagedTarget nativeCache, bool keepBinary = false) + public static ICache<TK, TV> GetCache<TK, TV>(IPlatformTargetInternal nativeCache, bool keepBinary = false) { - return new CacheImpl<TK, TV>(this, nativeCache, _marsh, false, keepBinary, false, false); + return new CacheImpl<TK, TV>(nativeCache, false, keepBinary, false, false); } /** <inheritdoc /> */ @@ -585,7 +593,7 @@ namespace Apache.Ignite.Core.Impl var aff = DoOutOpObject((int) Op.GetAffinity, w => w.WriteString(cacheName)); - return new CacheAffinityImpl(aff, _marsh, false, this); + return new CacheAffinityImpl(aff, false); } /** <inheritdoc /> */ @@ -627,7 +635,7 @@ namespace Apache.Ignite.Core.Impl if (nativeLong == null) return null; - return new AtomicLong(nativeLong, Marshaller, name); + return new AtomicLong(nativeLong, name); } /** <inheritdoc /> */ @@ -645,7 +653,7 @@ namespace Apache.Ignite.Core.Impl if (nativeSeq == null) return null; - return new AtomicSequence(nativeSeq, Marshaller, name); + return new AtomicSequence(nativeSeq, name); } /** <inheritdoc /> */ @@ -660,7 +668,7 @@ namespace Apache.Ignite.Core.Impl w.WriteBoolean(create); }); - return refTarget == null ? null : new AtomicReference<T>(refTarget, Marshaller, name); + return refTarget == null ? null : new AtomicReference<T>(refTarget, name); } /** <inheritdoc /> */ @@ -685,7 +693,7 @@ namespace Apache.Ignite.Core.Impl /** <inheritdoc /> */ public ICollection<string> GetCacheNames() { - return OutStream((int) Op.GetCacheNames, r => + return Target.OutStream((int) Op.GetCacheNames, r => { var res = new string[r.ReadInt()]; @@ -848,7 +856,7 @@ namespace Apache.Ignite.Core.Impl /// <summary> /// Gets the interop processor. /// </summary> - internal IUnmanagedTarget InteropProcessor + internal IPlatformTargetInternal InteropProcessor { get { return _proc; } } @@ -891,7 +899,7 @@ namespace Apache.Ignite.Core.Impl /// </summary> internal void ProcessorReleaseStart() { - InLongOutLong((int) Op.ReleaseStart, 0); + Target.InLongOutLong((int) Op.ReleaseStart, 0); } /// <summary> @@ -899,7 +907,7 @@ namespace Apache.Ignite.Core.Impl /// </summary> internal bool LoggerIsLevelEnabled(LogLevel logLevel) { - return InLongOutLong((int) Op.LoggerIsLevelEnabled, (long) logLevel) == True; + return Target.InLongOutLong((int) Op.LoggerIsLevelEnabled, (long) logLevel) == True; } /// <summary> @@ -907,7 +915,7 @@ namespace Apache.Ignite.Core.Impl /// </summary> internal void LoggerLog(LogLevel level, string msg, string category, string err) { - InStreamOutLong((int) Op.LoggerLog, w => + Target.InStreamOutLong((int) Op.LoggerLog, w => { w.WriteInt((int) level); w.WriteString(msg); @@ -921,7 +929,7 @@ namespace Apache.Ignite.Core.Impl /// </summary> internal IPlatformTarget GetExtension(int id) { - return InStreamOutObject((int) Op.GetExtension, w => w.WriteInt(id)); + return ((IPlatformTarget) Target).InStreamOutObject((int) Op.GetExtension, w => w.WriteInt(id)); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs index 1b43438..e17bcbf 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs @@ -29,13 +29,12 @@ namespace Apache.Ignite.Core.Impl.Messaging using Apache.Ignite.Core.Impl.Collections; using Apache.Ignite.Core.Impl.Common; using Apache.Ignite.Core.Impl.Resource; - using Apache.Ignite.Core.Impl.Unmanaged; using Apache.Ignite.Core.Messaging; /// <summary> /// Messaging functionality. /// </summary> - internal class Messaging : PlatformTarget, IMessaging + internal class Messaging : PlatformTargetAdapter, IMessaging { /// <summary> /// Opcodes. @@ -67,10 +66,9 @@ namespace Apache.Ignite.Core.Impl.Messaging /// Initializes a new instance of the <see cref="Messaging" /> class. /// </summary> /// <param name="target">Target.</param> - /// <param name="marsh">Marshaller.</param> /// <param name="prj">Cluster group.</param> - public Messaging(IUnmanagedTarget target, Marshaller marsh, IClusterGroup prj) - : base(target, marsh) + public Messaging(IPlatformTargetInternal target, IClusterGroup prj) + : base(target) { Debug.Assert(prj != null); @@ -102,7 +100,7 @@ namespace Apache.Ignite.Core.Impl.Messaging { writer.Write(topic); - WriteEnumerable(writer, messages.OfType<object>()); + writer.WriteEnumerable(messages.OfType<object>()); }); } http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformDisposableTargetAdapter.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformDisposableTargetAdapter.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformDisposableTargetAdapter.cs new file mode 100644 index 0000000..f884c40 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformDisposableTargetAdapter.cs @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl +{ + using System; + + /// <summary> + /// PlatformTargetAdapter with IDisposable pattern. + /// </summary> + internal abstract class PlatformDisposableTargetAdapter : PlatformTargetAdapter, IDisposable + { + /** Disposed flag. */ + private volatile bool _disposed; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="target">Target.</param> + protected PlatformDisposableTargetAdapter(IPlatformTargetInternal target) : base(target) + { + // No-op. + } + + /** <inheritdoc /> */ + public void Dispose() + { + lock (this) + { + if (_disposed) + return; + + Dispose(true); + + GC.SuppressFinalize(this); + + _disposed = true; + } + } + + /// <summary> + /// Releases unmanaged and - optionally - managed resources. + /// </summary> + /// <param name="disposing"> + /// <c>true</c> when called from Dispose; <c>false</c> when called from finalizer. + /// </param> + protected virtual void Dispose(bool disposing) + { + Target.Dispose(); + } + + /// <summary> + /// Throws <see cref="ObjectDisposedException"/> if this instance has been disposed. + /// </summary> + protected void ThrowIfDisposed() + { + if (_disposed) + throw new ObjectDisposedException(GetType().Name, "Object has been disposed."); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformJniTarget.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformJniTarget.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformJniTarget.cs new file mode 100644 index 0000000..725c112 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformJniTarget.cs @@ -0,0 +1,536 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Diagnostics.CodeAnalysis; + using System.IO; + using System.Threading; + using System.Threading.Tasks; + using Apache.Ignite.Core.Binary; + using Apache.Ignite.Core.Impl.Binary; + using Apache.Ignite.Core.Impl.Binary.IO; + using Apache.Ignite.Core.Impl.Common; + using Apache.Ignite.Core.Impl.Memory; + using Apache.Ignite.Core.Impl.Unmanaged; + using Apache.Ignite.Core.Interop; + using BinaryReader = Apache.Ignite.Core.Impl.Binary.BinaryReader; + using BinaryWriter = Apache.Ignite.Core.Impl.Binary.BinaryWriter; + using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; + + /// <summary> + /// Base class for interop targets. + /// </summary> + internal class PlatformJniTarget : IPlatformTargetInternal + { + /** */ + private static readonly Dictionary<Type, FutureType> IgniteFutureTypeMap + = new Dictionary<Type, FutureType> + { + {typeof(bool), FutureType.Bool}, + {typeof(byte), FutureType.Byte}, + {typeof(char), FutureType.Char}, + {typeof(double), FutureType.Double}, + {typeof(float), FutureType.Float}, + {typeof(int), FutureType.Int}, + {typeof(long), FutureType.Long}, + {typeof(short), FutureType.Short} + }; + + /** Unmanaged target. */ + private readonly IUnmanagedTarget _target; + + /** Marshaller. */ + private readonly Marshaller _marsh; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="target">Target.</param> + /// <param name="marsh">Marshaller.</param> + public PlatformJniTarget(IUnmanagedTarget target, Marshaller marsh) + { + Debug.Assert(target != null); + Debug.Assert(marsh != null); + + _target = target; + _marsh = marsh; + } + + /// <summary> + /// Gets the target. + /// </summary> + public IUnmanagedTarget Target + { + get { return _target; } + } + + /** <inheritdoc /> */ + public Marshaller Marshaller { get { return _marsh; } } + + /** <inheritdoc /> */ + public long InStreamOutLong(int type, Action<IBinaryStream> writeAction) + { + using (var stream = IgniteManager.Memory.Allocate().GetStream()) + { + writeAction(stream); + + return UU.TargetInStreamOutLong(_target, type, stream.SynchronizeOutput()); + } + } + + /** <inheritdoc /> */ + public IPlatformTargetInternal InStreamOutObject(int type, Action<IBinaryStream> writeAction) + { + using (var stream = IgniteManager.Memory.Allocate().GetStream()) + { + writeAction(stream); + + var target = UU.TargetInStreamOutObject(_target, type, stream.SynchronizeOutput()); + + return target == null ? null : new PlatformJniTarget(target, _marsh); + } + } + + /** <inheritdoc /> */ + public IPlatformTargetInternal OutObjectInternal(int type) + { + return GetPlatformTarget(UU.TargetOutObject(_target, type)); + } + + /** <inheritdoc /> */ + public T OutStream<T>(int type, Func<IBinaryStream, T> readAction) + { + using (var stream = IgniteManager.Memory.Allocate().GetStream()) + { + UU.TargetOutStream(_target, type, stream.MemoryPointer); + + stream.SynchronizeInput(); + + return readAction(stream); + } + } + + /** <inheritdoc /> */ + public TR InStreamOutStream<TR>(int type, Action<IBinaryStream> writeAction, + Func<IBinaryStream, TR> readAction) + { + using (var outStream = IgniteManager.Memory.Allocate().GetStream()) + using (var inStream = IgniteManager.Memory.Allocate().GetStream()) + { + writeAction(outStream); + + UU.TargetInStreamOutStream(_target, type, outStream.SynchronizeOutput(), inStream.MemoryPointer); + + inStream.SynchronizeInput(); + + return readAction(inStream); + } + } + + /** <inheritdoc /> */ + public TR InStreamOutLong<TR>(int type, Action<IBinaryStream> outAction, Func<IBinaryStream, long, TR> inAction, + Func<IBinaryStream, Exception> readErrorAction) + { + Debug.Assert(readErrorAction != null); + + using (var stream = IgniteManager.Memory.Allocate().GetStream()) + { + outAction(stream); + + var res = UU.TargetInStreamOutLong(_target, type, stream.SynchronizeOutput()); + + if (res != PlatformTargetAdapter.Error && inAction == null) + return default(TR); // quick path for void operations + + stream.SynchronizeInput(); + + stream.Seek(0, SeekOrigin.Begin); + + if (res != PlatformTargetAdapter.Error) + { + return inAction != null ? inAction(stream, res) : default(TR); + } + + throw readErrorAction(stream); + } + } + + /** <inheritdoc /> */ + public unsafe TR InObjectStreamOutObjectStream<TR>(int type, Action<IBinaryStream> writeAction, + Func<IBinaryStream, IPlatformTargetInternal, TR> readAction, IPlatformTargetInternal arg) + { + PlatformMemoryStream outStream = null; + long outPtr = 0; + + PlatformMemoryStream inStream = null; + long inPtr = 0; + + try + { + if (writeAction != null) + { + outStream = IgniteManager.Memory.Allocate().GetStream(); + writeAction(outStream); + outPtr = outStream.SynchronizeOutput(); + } + + if (readAction != null) + { + inStream = IgniteManager.Memory.Allocate().GetStream(); + inPtr = inStream.MemoryPointer; + } + + var res = UU.TargetInObjectStreamOutObjectStream(_target, type, + ((PlatformJniTarget)arg).Target.Target, outPtr, inPtr); + + if (readAction == null) + return default(TR); + + inStream.SynchronizeInput(); + + var target = res == null ? null : new PlatformJniTarget(res, _marsh); + + return readAction(inStream, target); + + } + finally + { + try + { + if (inStream != null) + inStream.Dispose(); + + } + finally + { + if (outStream != null) + outStream.Dispose(); + } + } + } + + /// <summary> + /// Finish marshaling. + /// </summary> + /// <param name="writer">Writer.</param> + private void FinishMarshal(BinaryWriter writer) + { + _marsh.FinishMarshal(writer); + } + + /// <summary> + /// Creates a future and starts listening. + /// </summary> + /// <typeparam name="T">Future result type</typeparam> + /// <param name="listenAction">The listen action.</param> + /// <param name="keepBinary">Keep binary flag, only applicable to object futures. False by default.</param> + /// <param name="convertFunc">The function to read future result from stream.</param> + /// <returns>Created future.</returns> + private Future<T> GetFuture<T>(Func<long, int, IUnmanagedTarget> listenAction, bool keepBinary = false, + Func<BinaryReader, T> convertFunc = null) + { + var futType = FutureType.Object; + + var type = typeof(T); + + if (type.IsPrimitive) + IgniteFutureTypeMap.TryGetValue(type, out futType); + + var fut = convertFunc == null && futType != FutureType.Object + ? new Future<T>() + : new Future<T>(new FutureConverter<T>(_marsh, keepBinary, convertFunc)); + + var futHnd = _marsh.Ignite.HandleRegistry.Allocate(fut); + + IUnmanagedTarget futTarget; + + try + { + futTarget = listenAction(futHnd, (int)futType); + } + catch (Exception) + { + _marsh.Ignite.HandleRegistry.Release(futHnd); + + throw; + } + + fut.SetTarget(new Listenable(new PlatformJniTarget(futTarget, _marsh))); + + return fut; + } + + /// <summary> + /// Creates a future and starts listening. + /// </summary> + /// <typeparam name="T">Future result type</typeparam> + /// <param name="listenAction">The listen action.</param> + /// <param name="keepBinary">Keep binary flag, only applicable to object futures. False by default.</param> + /// <param name="convertFunc">The function to read future result from stream.</param> + /// <returns>Created future.</returns> + private Future<T> GetFuture<T>(Action<long, int> listenAction, bool keepBinary = false, + Func<BinaryReader, T> convertFunc = null) + { + var futType = FutureType.Object; + + var type = typeof(T); + + if (type.IsPrimitive) + IgniteFutureTypeMap.TryGetValue(type, out futType); + + var fut = convertFunc == null && futType != FutureType.Object + ? new Future<T>() + : new Future<T>(new FutureConverter<T>(_marsh, keepBinary, convertFunc)); + + var futHnd = _marsh.Ignite.HandleRegistry.Allocate(fut); + + try + { + listenAction(futHnd, (int)futType); + } + catch (Exception) + { + _marsh.Ignite.HandleRegistry.Release(futHnd); + + throw; + } + + return fut; + } + + #region IPlatformTarget + + /** <inheritdoc /> */ + public long InLongOutLong(int type, long val) + { + return UU.TargetInLongOutLong(_target, type, val); + } + + /** <inheritdoc /> */ + public long InStreamOutLong(int type, Action<IBinaryRawWriter> writeAction) + { + using (var stream = IgniteManager.Memory.Allocate().GetStream()) + { + var writer = _marsh.StartMarshal(stream); + + writeAction(writer); + + FinishMarshal(writer); + + return UU.TargetInStreamOutLong(_target, type, stream.SynchronizeOutput()); + } + } + + /** <inheritdoc /> */ + public T InStreamOutStream<T>(int type, Action<IBinaryRawWriter> writeAction, + Func<IBinaryRawReader, T> readAction) + { + using (var outStream = IgniteManager.Memory.Allocate().GetStream()) + using (var inStream = IgniteManager.Memory.Allocate().GetStream()) + { + var writer = _marsh.StartMarshal(outStream); + + writeAction(writer); + + FinishMarshal(writer); + + UU.TargetInStreamOutStream(_target, type, outStream.SynchronizeOutput(), inStream.MemoryPointer); + + inStream.SynchronizeInput(); + + return readAction(_marsh.StartUnmarshal(inStream)); + } + } + + /** <inheritdoc /> */ + public IPlatformTarget InStreamOutObject(int type, Action<IBinaryRawWriter> writeAction) + { + using (var stream = IgniteManager.Memory.Allocate().GetStream()) + { + var writer = _marsh.StartMarshal(stream); + + writeAction(writer); + + FinishMarshal(writer); + + return GetPlatformTarget(UU.TargetInStreamOutObject(_target, type, stream.SynchronizeOutput())); + } + } + + /** <inheritdoc /> */ + public unsafe T InObjectStreamOutObjectStream<T>(int type, IPlatformTarget arg, + Action<IBinaryRawWriter> writeAction, Func<IBinaryRawReader, IPlatformTarget, T> readAction) + { + PlatformMemoryStream outStream = null; + long outPtr = 0; + + PlatformMemoryStream inStream = null; + long inPtr = 0; + + try + { + if (writeAction != null) + { + outStream = IgniteManager.Memory.Allocate().GetStream(); + var writer = _marsh.StartMarshal(outStream); + writeAction(writer); + FinishMarshal(writer); + outPtr = outStream.SynchronizeOutput(); + } + + if (readAction != null) + { + inStream = IgniteManager.Memory.Allocate().GetStream(); + inPtr = inStream.MemoryPointer; + } + + var res = UU.TargetInObjectStreamOutObjectStream(_target, type, GetTargetPtr(arg), outPtr, inPtr); + + if (readAction == null) + return default(T); + + inStream.SynchronizeInput(); + + return readAction(_marsh.StartUnmarshal(inStream), GetPlatformTarget(res)); + + } + finally + { + try + { + if (inStream != null) + inStream.Dispose(); + + } + finally + { + if (outStream != null) + outStream.Dispose(); + } + } + } + + /** <inheritdoc /> */ + public T OutStream<T>(int type, Func<IBinaryRawReader, T> readAction) + { + using (var stream = IgniteManager.Memory.Allocate().GetStream()) + { + UU.TargetOutStream(_target, type, stream.MemoryPointer); + + stream.SynchronizeInput(); + + return readAction(_marsh.StartUnmarshal(stream)); + } + } + + /** <inheritdoc /> */ + public IPlatformTarget OutObject(int type) + { + return OutObjectInternal(type); + } + + /** <inheritdoc /> */ + public Task<T> DoOutOpAsync<T>(int type, Action<IBinaryRawWriter> writeAction = null, + Func<IBinaryRawReader, T> readAction = null) + { + var convertFunc = readAction != null + ? r => readAction(r) + : (Func<BinaryReader, T>)null; + return GetFuture((futId, futType) => + { + using (var stream = IgniteManager.Memory.Allocate().GetStream()) + { + stream.WriteLong(futId); + stream.WriteInt(futType); + + if (writeAction != null) + { + var writer = _marsh.StartMarshal(stream); + + writeAction(writer); + + FinishMarshal(writer); + } + + UU.TargetInStreamAsync(_target, type, stream.SynchronizeOutput()); + } + }, false, convertFunc).Task; + } + + /** <inheritdoc /> */ + public Task<T> DoOutOpAsync<T>(int type, Action<IBinaryRawWriter> writeAction, + Func<IBinaryRawReader, T> readAction, CancellationToken cancellationToken) + { + var convertFunc = readAction != null + ? r => readAction(r) + : (Func<BinaryReader, T>) null; + + return GetFuture((futId, futType) => + { + using (var stream = IgniteManager.Memory.Allocate().GetStream()) + { + stream.WriteLong(futId); + stream.WriteInt(futType); + + if (writeAction != null) + { + var writer = _marsh.StartMarshal(stream); + + writeAction(writer); + + FinishMarshal(writer); + } + + return UU.TargetInStreamOutObjectAsync(_target, type, stream.SynchronizeOutput()); + } + }, false, convertFunc).GetTask(cancellationToken); + } + + /// <summary> + /// Gets the platform target. + /// </summary> + private IPlatformTargetInternal GetPlatformTarget(IUnmanagedTarget target) + { + return target == null ? null : new PlatformJniTarget(target, _marsh); + } + + /// <summary> + /// Gets the target pointer. + /// </summary> + private static unsafe void* GetTargetPtr(IPlatformTarget target) + { + return target == null ? null : ((PlatformJniTarget) target)._target.Target; + } + + #endregion + + /** <inheritdoc /> */ + [SuppressMessage("Microsoft.Usage", "CA1816:CallGCSuppressFinalizeCorrectly", + Justification = "There is no finalizer.")] + public void Dispose() + { + if (_target != null) + { + _target.Dispose(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs deleted file mode 100644 index 474af0e..0000000 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs +++ /dev/null @@ -1,1086 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl -{ - using System; - using System.Collections.Generic; - using System.Diagnostics; - using System.Diagnostics.CodeAnalysis; - using System.IO; - using System.Threading.Tasks; - using Apache.Ignite.Core.Binary; - using Apache.Ignite.Core.Impl.Binary; - using Apache.Ignite.Core.Impl.Binary.IO; - using Apache.Ignite.Core.Impl.Common; - using Apache.Ignite.Core.Impl.Memory; - using Apache.Ignite.Core.Impl.Unmanaged; - using Apache.Ignite.Core.Interop; - using BinaryReader = Apache.Ignite.Core.Impl.Binary.BinaryReader; - using BinaryWriter = Apache.Ignite.Core.Impl.Binary.BinaryWriter; - using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; - - /// <summary> - /// Base class for interop targets. - /// </summary> - [SuppressMessage("ReSharper", "LocalVariableHidesMember")] - internal class PlatformTarget : IPlatformTarget - { - /** */ - protected const int False = 0; - - /** */ - protected const int True = 1; - - /** */ - protected const int Error = -1; - - /** */ - public const int OpNone = -2; - - /** */ - private static readonly Dictionary<Type, FutureType> IgniteFutureTypeMap - = new Dictionary<Type, FutureType> - { - {typeof(bool), FutureType.Bool}, - {typeof(byte), FutureType.Byte}, - {typeof(char), FutureType.Char}, - {typeof(double), FutureType.Double}, - {typeof(float), FutureType.Float}, - {typeof(int), FutureType.Int}, - {typeof(long), FutureType.Long}, - {typeof(short), FutureType.Short} - }; - - /** Unmanaged target. */ - private readonly IUnmanagedTarget _target; - - /** Marshaller. */ - private readonly Marshaller _marsh; - - /// <summary> - /// Constructor. - /// </summary> - /// <param name="target">Target.</param> - /// <param name="marsh">Marshaller.</param> - public PlatformTarget(IUnmanagedTarget target, Marshaller marsh) - { - Debug.Assert(target != null); - Debug.Assert(marsh != null); - - _target = target; - _marsh = marsh; - } - - /// <summary> - /// Unmanaged target. - /// </summary> - internal IUnmanagedTarget Target - { - get { return _target; } - } - - /// <summary> - /// Marshaller. - /// </summary> - internal Marshaller Marshaller - { - get { return _marsh; } - } - - #region Static Helpers - - /// <summary> - /// Write collection. - /// </summary> - /// <param name="writer">Writer.</param> - /// <param name="vals">Values.</param> - /// <returns>The same writer for chaining.</returns> - protected static BinaryWriter WriteCollection<T>(BinaryWriter writer, ICollection<T> vals) - { - return WriteCollection<T, T>(writer, vals, null); - } - - /// <summary> - /// Write nullable collection. - /// </summary> - /// <param name="writer">Writer.</param> - /// <param name="vals">Values.</param> - /// <returns>The same writer for chaining.</returns> - protected static BinaryWriter WriteNullableCollection<T>(BinaryWriter writer, ICollection<T> vals) - { - return WriteNullable(writer, vals, WriteCollection); - } - - /// <summary> - /// Write collection. - /// </summary> - /// <param name="writer">Writer.</param> - /// <param name="vals">Values.</param> - /// <param name="selector">A transform function to apply to each element.</param> - /// <returns>The same writer for chaining.</returns> - protected static BinaryWriter WriteCollection<T1, T2>(BinaryWriter writer, - ICollection<T1> vals, Func<T1, T2> selector) - { - writer.WriteInt(vals.Count); - - if (selector == null) - { - foreach (var val in vals) - writer.Write(val); - } - else - { - foreach (var val in vals) - writer.Write(selector(val)); - } - - return writer; - } - - /// <summary> - /// Write enumerable. - /// </summary> - /// <param name="writer">Writer.</param> - /// <param name="vals">Values.</param> - /// <returns>The same writer for chaining.</returns> - protected static BinaryWriter WriteEnumerable<T>(BinaryWriter writer, IEnumerable<T> vals) - { - return WriteEnumerable<T, T>(writer, vals, null); - } - - /// <summary> - /// Write enumerable. - /// </summary> - /// <param name="writer">Writer.</param> - /// <param name="vals">Values.</param> - /// <param name="selector">A transform function to apply to each element.</param> - /// <returns>The same writer for chaining.</returns> - protected static BinaryWriter WriteEnumerable<T1, T2>(BinaryWriter writer, - IEnumerable<T1> vals, Func<T1, T2> selector) - { - var col = vals as ICollection<T1>; - - if (col != null) - return WriteCollection(writer, col, selector); - - var stream = writer.Stream; - - var pos = stream.Position; - - stream.Seek(4, SeekOrigin.Current); - - var size = 0; - - if (selector == null) - { - foreach (var val in vals) - { - writer.Write(val); - - size++; - } - } - else - { - foreach (var val in vals) - { - writer.Write(selector(val)); - - size++; - } - } - - stream.WriteInt(pos, size); - - return writer; - } - - /// <summary> - /// Write dictionary. - /// </summary> - /// <param name="writer">Writer.</param> - /// <param name="vals">Values.</param> - protected static void WriteDictionary<T1, T2>(BinaryWriter writer, IEnumerable<KeyValuePair<T1, T2>> vals) - { - var pos = writer.Stream.Position; - writer.WriteInt(0); // Reserve count. - - int cnt = 0; - - foreach (var pair in vals) - { - writer.Write(pair.Key); - writer.Write(pair.Value); - - cnt++; - } - - writer.Stream.WriteInt(pos, cnt); - } - - /// <summary> - /// Write a nullable item. - /// </summary> - /// <param name="writer">Writer.</param> - /// <param name="item">Item.</param> - /// <param name="writeItem">Write action to perform on item when it is not null.</param> - /// <returns>The same writer for chaining.</returns> - private static BinaryWriter WriteNullable<T>(BinaryWriter writer, T item, - Func<BinaryWriter, T, BinaryWriter> writeItem) where T : class - { - if (item == null) - { - writer.WriteBoolean(false); - - return writer; - } - - writer.WriteBoolean(true); - - return writeItem(writer, item); - } - - #endregion - - #region OUT operations - - /// <summary> - /// Perform out operation. - /// </summary> - /// <param name="type">Operation type.</param> - /// <param name="action">Action to be performed on the stream.</param> - /// <returns></returns> - protected long DoOutOp(int type, Action<IBinaryStream> action) - { - using (var stream = IgniteManager.Memory.Allocate().GetStream()) - { - action(stream); - - return UU.TargetInStreamOutLong(_target, type, stream.SynchronizeOutput()); - } - } - - /// <summary> - /// Perform out operation. - /// </summary> - /// <param name="type">Operation type.</param> - /// <param name="action">Action to be performed on the stream.</param> - /// <returns></returns> - protected long DoOutOp(int type, Action<BinaryWriter> action) - { - using (var stream = IgniteManager.Memory.Allocate().GetStream()) - { - var writer = _marsh.StartMarshal(stream); - - action(writer); - - FinishMarshal(writer); - - return UU.TargetInStreamOutLong(_target, type, stream.SynchronizeOutput()); - } - } - - /// <summary> - /// Perform out operation. - /// </summary> - /// <param name="type">Operation type.</param> - /// <param name="action">Action to be performed on the stream.</param> - /// <returns>Resulting object.</returns> - protected IUnmanagedTarget DoOutOpObject(int type, Action<BinaryWriter> action) - { - using (var stream = IgniteManager.Memory.Allocate().GetStream()) - { - var writer = _marsh.StartMarshal(stream); - - action(writer); - - FinishMarshal(writer); - - return UU.TargetInStreamOutObject(_target, type, stream.SynchronizeOutput()); - } - } - - /// <summary> - /// Perform out operation. - /// </summary> - /// <param name="type">Operation type.</param> - /// <param name="action">Action to be performed on the stream.</param> - /// <returns>Resulting object.</returns> - protected IUnmanagedTarget DoOutOpObject(int type, Action<IBinaryStream> action) - { - using (var stream = IgniteManager.Memory.Allocate().GetStream()) - { - action(stream); - - return UU.TargetInStreamOutObject(_target, type, stream.SynchronizeOutput()); - } - } - - /// <summary> - /// Perform out operation. - /// </summary> - /// <param name="type">Operation type.</param> - /// <returns>Resulting object.</returns> - protected IUnmanagedTarget DoOutOpObject(int type) - { - return UU.TargetOutObject(_target, type); - } - - /// <summary> - /// Perform simple output operation accepting single argument. - /// </summary> - /// <param name="type">Operation type.</param> - /// <param name="val1">Value.</param> - /// <returns>Result.</returns> - protected long DoOutOp<T1>(int type, T1 val1) - { - return DoOutOp(type, writer => - { - writer.Write(val1); - }); - } - - /// <summary> - /// Perform simple output operation accepting two arguments. - /// </summary> - /// <param name="type">Operation type.</param> - /// <param name="val1">Value 1.</param> - /// <param name="val2">Value 2.</param> - /// <returns>Result.</returns> - protected long DoOutOp<T1, T2>(int type, T1 val1, T2 val2) - { - return DoOutOp(type, writer => - { - writer.Write(val1); - writer.Write(val2); - }); - } - - /// <summary> - /// Perform simple output operation accepting three arguments. - /// </summary> - /// <param name="type">Operation type.</param> - /// <param name="val1">Value 1.</param> - /// <param name="val2">Value 2.</param> - /// <param name="val3">Value 3.</param> - /// <returns>Result.</returns> - protected long DoOutOp<T1, T2, T3>(int type, T1 val1, T2 val2, T3 val3) - { - return DoOutOp(type, writer => - { - writer.Write(val1); - writer.Write(val2); - writer.Write(val3); - }); - } - - #endregion - - #region IN operations - - /// <summary> - /// Perform in operation. - /// </summary> - /// <param name="type">Type.</param> - /// <param name="action">Action.</param> - protected void DoInOp(int type, Action<IBinaryStream> action) - { - using (var stream = IgniteManager.Memory.Allocate().GetStream()) - { - UU.TargetOutStream(_target, type, stream.MemoryPointer); - - stream.SynchronizeInput(); - - action(stream); - } - } - - /// <summary> - /// Perform in operation. - /// </summary> - /// <param name="type">Type.</param> - /// <param name="action">Action.</param> - /// <returns>Result.</returns> - protected T DoInOp<T>(int type, Func<IBinaryStream, T> action) - { - using (var stream = IgniteManager.Memory.Allocate().GetStream()) - { - UU.TargetOutStream(_target, type, stream.MemoryPointer); - - stream.SynchronizeInput(); - - return action(stream); - } - } - - /// <summary> - /// Perform simple in operation returning immediate result. - /// </summary> - /// <param name="type">Type.</param> - /// <returns>Result.</returns> - protected T DoInOp<T>(int type) - { - using (var stream = IgniteManager.Memory.Allocate().GetStream()) - { - UU.TargetOutStream(_target, type, stream.MemoryPointer); - - stream.SynchronizeInput(); - - return Unmarshal<T>(stream); - } - } - - #endregion - - #region OUT-IN operations - - /// <summary> - /// Perform out-in operation. - /// </summary> - /// <param name="type">Operation type.</param> - /// <param name="outAction">Out action.</param> - /// <param name="inAction">In action.</param> - protected void DoOutInOp(int type, Action<BinaryWriter> outAction, Action<IBinaryStream> inAction) - { - using (PlatformMemoryStream outStream = IgniteManager.Memory.Allocate().GetStream()) - { - using (PlatformMemoryStream inStream = IgniteManager.Memory.Allocate().GetStream()) - { - BinaryWriter writer = _marsh.StartMarshal(outStream); - - outAction(writer); - - FinishMarshal(writer); - - UU.TargetInStreamOutStream(_target, type, outStream.SynchronizeOutput(), inStream.MemoryPointer); - - inStream.SynchronizeInput(); - - inAction(inStream); - } - } - } - - /// <summary> - /// Perform out-in operation. - /// </summary> - /// <param name="type">Operation type.</param> - /// <param name="outAction">Out action.</param> - /// <param name="inAction">In action.</param> - /// <returns>Result.</returns> - protected TR DoOutInOp<TR>(int type, Action<BinaryWriter> outAction, Func<IBinaryStream, TR> inAction) - { - using (PlatformMemoryStream outStream = IgniteManager.Memory.Allocate().GetStream()) - { - using (PlatformMemoryStream inStream = IgniteManager.Memory.Allocate().GetStream()) - { - BinaryWriter writer = _marsh.StartMarshal(outStream); - - outAction(writer); - - FinishMarshal(writer); - - UU.TargetInStreamOutStream(_target, type, outStream.SynchronizeOutput(), inStream.MemoryPointer); - - inStream.SynchronizeInput(); - - return inAction(inStream); - } - } - } - - /// <summary> - /// Perform out-in operation with a single stream. - /// </summary> - /// <typeparam name="TR">The type of the r.</typeparam> - /// <param name="type">Operation type.</param> - /// <param name="outAction">Out action.</param> - /// <param name="inAction">In action.</param> - /// <param name="inErrorAction">The action to read an error.</param> - /// <returns> - /// Result. - /// </returns> - protected TR DoOutInOpX<TR>(int type, Action<BinaryWriter> outAction, Func<IBinaryStream, long, TR> inAction, - Func<IBinaryStream, Exception> inErrorAction) - { - Debug.Assert(inErrorAction != null); - - using (var stream = IgniteManager.Memory.Allocate().GetStream()) - { - var writer = _marsh.StartMarshal(stream); - - outAction(writer); - - FinishMarshal(writer); - - var res = UU.TargetInStreamOutLong(_target, type, stream.SynchronizeOutput()); - - if (res != Error && inAction == null) - return default(TR); // quick path for void operations - - stream.SynchronizeInput(); - - stream.Seek(0, SeekOrigin.Begin); - - if (res != Error) - return inAction != null ? inAction(stream, res) : default(TR); - - throw inErrorAction(stream); - } - } - - /// <summary> - /// Perform out-in operation with a single stream. - /// </summary> - /// <param name="type">Operation type.</param> - /// <param name="outAction">Out action.</param> - /// <param name="inErrorAction">The action to read an error.</param> - /// <returns> - /// Result. - /// </returns> - protected bool DoOutInOpX(int type, Action<BinaryWriter> outAction, - Func<IBinaryStream, Exception> inErrorAction) - { - Debug.Assert(inErrorAction != null); - - using (var stream = IgniteManager.Memory.Allocate().GetStream()) - { - var writer = _marsh.StartMarshal(stream); - - outAction(writer); - - FinishMarshal(writer); - - var res = UU.TargetInStreamOutLong(_target, type, stream.SynchronizeOutput()); - - if (res != Error) - return res == True; - - stream.SynchronizeInput(); - - stream.Seek(0, SeekOrigin.Begin); - - throw inErrorAction(stream); - } - } - - /// <summary> - /// Perform out-in operation. - /// </summary> - /// <param name="type">Operation type.</param> - /// <param name="outAction">Out action.</param> - /// <param name="inAction">In action.</param> - /// <param name="arg">Argument.</param> - /// <returns>Result.</returns> - protected unsafe TR DoOutInOp<TR>(int type, Action<BinaryWriter> outAction, - Func<IBinaryStream, IUnmanagedTarget, TR> inAction, void* arg) - { - PlatformMemoryStream outStream = null; - long outPtr = 0; - - PlatformMemoryStream inStream = null; - long inPtr = 0; - - try - { - if (outAction != null) - { - outStream = IgniteManager.Memory.Allocate().GetStream(); - var writer = _marsh.StartMarshal(outStream); - outAction(writer); - FinishMarshal(writer); - outPtr = outStream.SynchronizeOutput(); - } - - if (inAction != null) - { - inStream = IgniteManager.Memory.Allocate().GetStream(); - inPtr = inStream.MemoryPointer; - } - - var res = UU.TargetInObjectStreamOutObjectStream(_target, type, arg, outPtr, inPtr); - - if (inAction == null) - return default(TR); - - inStream.SynchronizeInput(); - - return inAction(inStream, res); - - } - finally - { - try - { - if (inStream != null) - inStream.Dispose(); - - } - finally - { - if (outStream != null) - outStream.Dispose(); - } - } - } - - /// <summary> - /// Perform out-in operation. - /// </summary> - /// <param name="type">Operation type.</param> - /// <param name="outAction">Out action.</param> - /// <returns>Result.</returns> - protected TR DoOutInOp<TR>(int type, Action<BinaryWriter> outAction) - { - using (PlatformMemoryStream outStream = IgniteManager.Memory.Allocate().GetStream()) - { - using (PlatformMemoryStream inStream = IgniteManager.Memory.Allocate().GetStream()) - { - BinaryWriter writer = _marsh.StartMarshal(outStream); - - outAction(writer); - - FinishMarshal(writer); - - UU.TargetInStreamOutStream(_target, type, outStream.SynchronizeOutput(), inStream.MemoryPointer); - - inStream.SynchronizeInput(); - - return Unmarshal<TR>(inStream); - } - } - } - - /// <summary> - /// Perform simple out-in operation accepting single argument. - /// </summary> - /// <param name="type">Operation type.</param> - /// <param name="val">Value.</param> - /// <returns>Result.</returns> - protected TR DoOutInOp<T1, TR>(int type, T1 val) - { - using (PlatformMemoryStream outStream = IgniteManager.Memory.Allocate().GetStream()) - { - using (PlatformMemoryStream inStream = IgniteManager.Memory.Allocate().GetStream()) - { - BinaryWriter writer = _marsh.StartMarshal(outStream); - - writer.WriteObject(val); - - FinishMarshal(writer); - - UU.TargetInStreamOutStream(_target, type, outStream.SynchronizeOutput(), inStream.MemoryPointer); - - inStream.SynchronizeInput(); - - return Unmarshal<TR>(inStream); - } - } - } - - /// <summary> - /// Perform simple out-in operation accepting two arguments. - /// </summary> - /// <param name="type">Operation type.</param> - /// <param name="val1">Value.</param> - /// <param name="val2">Value.</param> - /// <returns>Result.</returns> - protected TR DoOutInOp<T1, T2, TR>(int type, T1 val1, T2 val2) - { - using (PlatformMemoryStream outStream = IgniteManager.Memory.Allocate().GetStream()) - { - using (PlatformMemoryStream inStream = IgniteManager.Memory.Allocate().GetStream()) - { - BinaryWriter writer = _marsh.StartMarshal(outStream); - - writer.WriteObject(val1); - writer.WriteObject(val2); - - FinishMarshal(writer); - - UU.TargetInStreamOutStream(_target, type, outStream.SynchronizeOutput(), inStream.MemoryPointer); - - inStream.SynchronizeInput(); - - return Unmarshal<TR>(inStream); - } - } - } - - /// <summary> - /// Perform simple out-in operation accepting two arguments. - /// </summary> - /// <param name="type">Operation type.</param> - /// <param name="val">Value.</param> - /// <returns>Result.</returns> - protected long DoOutInOp(int type, long val = 0) - { - return UU.TargetInLongOutLong(_target, type, val); - } - - #endregion - - #region Async operations - - /// <summary> - /// Performs async operation. - /// </summary> - /// <param name="type">The type code.</param> - /// <param name="writeAction">The write action.</param> - /// <returns>Task for async operation</returns> - protected Task DoOutOpAsync(int type, Action<BinaryWriter> writeAction = null) - { - return DoOutOpAsync<object>(type, writeAction); - } - - /// <summary> - /// Performs async operation. - /// </summary> - /// <typeparam name="T">Type of the result.</typeparam> - /// <param name="type">The type code.</param> - /// <param name="writeAction">The write action.</param> - /// <param name="keepBinary">Keep binary flag, only applicable to object futures. False by default.</param> - /// <param name="convertFunc">The function to read future result from stream.</param> - /// <returns>Task for async operation</returns> - protected Task<T> DoOutOpAsync<T>(int type, Action<BinaryWriter> writeAction = null, bool keepBinary = false, - Func<BinaryReader, T> convertFunc = null) - { - return GetFuture((futId, futType) => DoOutOp(type, w => - { - if (writeAction != null) - writeAction(w); - w.WriteLong(futId); - w.WriteInt(futType); - }), keepBinary, convertFunc).Task; - } - - /// <summary> - /// Performs async operation. - /// </summary> - /// <typeparam name="T">Type of the result.</typeparam> - /// <param name="type">The type code.</param> - /// <param name="writeAction">The write action.</param> - /// <returns>Future for async operation</returns> - protected Future<T> DoOutOpObjectAsync<T>(int type, Action<IBinaryRawWriter> writeAction) - { - return GetFuture<T>((futId, futType) => DoOutOpObject(type, w => - { - writeAction(w); - w.WriteLong(futId); - w.WriteInt(futType); - })); - } - - /// <summary> - /// Performs async operation. - /// </summary> - /// <typeparam name="TR">Type of the result.</typeparam> - /// <typeparam name="T1">The type of the first arg.</typeparam> - /// <param name="type">The type code.</param> - /// <param name="val1">First arg.</param> - /// <returns> - /// Task for async operation - /// </returns> - protected Task<TR> DoOutOpAsync<T1, TR>(int type, T1 val1) - { - return GetFuture<TR>((futId, futType) => DoOutOp(type, w => - { - w.WriteObject(val1); - w.WriteLong(futId); - w.WriteInt(futType); - })).Task; - } - - /// <summary> - /// Performs async operation. - /// </summary> - /// <typeparam name="TR">Type of the result.</typeparam> - /// <typeparam name="T1">The type of the first arg.</typeparam> - /// <typeparam name="T2">The type of the second arg.</typeparam> - /// <param name="type">The type code.</param> - /// <param name="val1">First arg.</param> - /// <param name="val2">Second arg.</param> - /// <returns> - /// Task for async operation - /// </returns> - protected Task<TR> DoOutOpAsync<T1, T2, TR>(int type, T1 val1, T2 val2) - { - return GetFuture<TR>((futId, futType) => DoOutOp(type, w => - { - w.WriteObject(val1); - w.WriteObject(val2); - w.WriteLong(futId); - w.WriteInt(futType); - })).Task; - } - - #endregion - - #region Miscelanneous - - /// <summary> - /// Finish marshaling. - /// </summary> - /// <param name="writer">Writer.</param> - internal void FinishMarshal(BinaryWriter writer) - { - _marsh.FinishMarshal(writer); - } - - /// <summary> - /// Unmarshal object using the given stream. - /// </summary> - /// <param name="stream">Stream.</param> - /// <returns>Unmarshalled object.</returns> - protected virtual T Unmarshal<T>(IBinaryStream stream) - { - return _marsh.Unmarshal<T>(stream); - } - - /// <summary> - /// Creates a future and starts listening. - /// </summary> - /// <typeparam name="T">Future result type</typeparam> - /// <param name="listenAction">The listen action.</param> - /// <param name="keepBinary">Keep binary flag, only applicable to object futures. False by default.</param> - /// <param name="convertFunc">The function to read future result from stream.</param> - /// <returns>Created future.</returns> - private Future<T> GetFuture<T>(Func<long, int, IUnmanagedTarget> listenAction, bool keepBinary = false, - Func<BinaryReader, T> convertFunc = null) - { - var futType = FutureType.Object; - - var type = typeof(T); - - if (type.IsPrimitive) - IgniteFutureTypeMap.TryGetValue(type, out futType); - - var fut = convertFunc == null && futType != FutureType.Object - ? new Future<T>() - : new Future<T>(new FutureConverter<T>(_marsh, keepBinary, convertFunc)); - - var futHnd = _marsh.Ignite.HandleRegistry.Allocate(fut); - - IUnmanagedTarget futTarget; - - try - { - futTarget = listenAction(futHnd, (int)futType); - } - catch (Exception) - { - _marsh.Ignite.HandleRegistry.Release(futHnd); - - throw; - } - - fut.SetTarget(new Listenable(futTarget, _marsh)); - - return fut; - } - - /// <summary> - /// Creates a future and starts listening. - /// </summary> - /// <typeparam name="T">Future result type</typeparam> - /// <param name="listenAction">The listen action.</param> - /// <param name="keepBinary">Keep binary flag, only applicable to object futures. False by default.</param> - /// <param name="convertFunc">The function to read future result from stream.</param> - /// <returns>Created future.</returns> - protected Future<T> GetFuture<T>(Action<long, int> listenAction, bool keepBinary = false, - Func<BinaryReader, T> convertFunc = null) - { - var futType = FutureType.Object; - - var type = typeof(T); - - if (type.IsPrimitive) - IgniteFutureTypeMap.TryGetValue(type, out futType); - - var fut = convertFunc == null && futType != FutureType.Object - ? new Future<T>() - : new Future<T>(new FutureConverter<T>(_marsh, keepBinary, convertFunc)); - - var futHnd = _marsh.Ignite.HandleRegistry.Allocate(fut); - - try - { - listenAction(futHnd, (int)futType); - } - catch (Exception) - { - _marsh.Ignite.HandleRegistry.Release(futHnd); - - throw; - } - - return fut; - } - - #endregion - - #region IPlatformTarget - - /** <inheritdoc /> */ - public long InLongOutLong(int type, long val) - { - return DoOutInOp(type, val); - } - - /** <inheritdoc /> */ - public long InStreamOutLong(int type, Action<IBinaryRawWriter> writeAction) - { - return DoOutOp(type, writer => writeAction(writer)); - } - - /** <inheritdoc /> */ - public T InStreamOutStream<T>(int type, Action<IBinaryRawWriter> writeAction, - Func<IBinaryRawReader, T> readAction) - { - return DoOutInOp(type, writeAction, stream => readAction(Marshaller.StartUnmarshal(stream))); - } - - /** <inheritdoc /> */ - public IPlatformTarget InStreamOutObject(int type, Action<IBinaryRawWriter> writeAction) - { - return GetPlatformTarget(DoOutOpObject(type, writeAction)); - } - - /** <inheritdoc /> */ - public unsafe T InObjectStreamOutObjectStream<T>(int type, IPlatformTarget arg, Action<IBinaryRawWriter> writeAction, - Func<IBinaryRawReader, IPlatformTarget, T> readAction) - { - return DoOutInOp(type, writeAction, (stream, obj) => readAction(Marshaller.StartUnmarshal(stream), - GetPlatformTarget(obj)), GetTargetPtr(arg)); - } - - /** <inheritdoc /> */ - public T OutStream<T>(int type, Func<IBinaryRawReader, T> readAction) - { - return DoInOp(type, stream => readAction(Marshaller.StartUnmarshal(stream))); - } - - /** <inheritdoc /> */ - public IPlatformTarget OutObject(int type) - { - return GetPlatformTarget(DoOutOpObject(type)); - } - - /** <inheritdoc /> */ - public Task<T> DoOutOpAsync<T>(int type, Action<IBinaryRawWriter> writeAction = null, - Func<IBinaryRawReader, T> readAction = null) - { - var convertFunc = readAction != null - ? r => readAction(r) - : (Func<BinaryReader, T>) null; - - return GetFuture((futId, futType) => - { - using (var stream = IgniteManager.Memory.Allocate().GetStream()) - { - stream.WriteLong(futId); - stream.WriteInt(futType); - - if (writeAction != null) - { - var writer = _marsh.StartMarshal(stream); - - writeAction(writer); - - FinishMarshal(writer); - } - - UU.TargetInStreamAsync(_target, type, stream.SynchronizeOutput()); - } - }, false, convertFunc).Task; - } - - /// <summary> - /// Gets the platform target. - /// </summary> - private IPlatformTarget GetPlatformTarget(IUnmanagedTarget target) - { - return target == null ? null : new PlatformTarget(target, Marshaller); - } - - /// <summary> - /// Gets the target pointer. - /// </summary> - private static unsafe void* GetTargetPtr(IPlatformTarget target) - { - return target == null ? null : ((PlatformTarget) target).Target.Target; - } - - #endregion - } - - /// <summary> - /// PlatformTarget with IDisposable pattern. - /// </summary> - internal abstract class PlatformDisposableTarget : PlatformTarget, IDisposable - { - /** Disposed flag. */ - private volatile bool _disposed; - - /// <summary> - /// Constructor. - /// </summary> - /// <param name="target">Target.</param> - /// <param name="marsh">Marshaller.</param> - protected PlatformDisposableTarget(IUnmanagedTarget target, Marshaller marsh) : base(target, marsh) - { - // No-op. - } - - /** <inheritdoc /> */ - public void Dispose() - { - lock (this) - { - if (_disposed) - return; - - Dispose(true); - - GC.SuppressFinalize(this); - - _disposed = true; - } - } - - /// <summary> - /// Releases unmanaged and - optionally - managed resources. - /// </summary> - /// <param name="disposing"> - /// <c>true</c> when called from Dispose; <c>false</c> when called from finalizer. - /// </param> - protected virtual void Dispose(bool disposing) - { - Target.Dispose(); - } - - /// <summary> - /// Throws <see cref="ObjectDisposedException"/> if this instance has been disposed. - /// </summary> - protected void ThrowIfDisposed() - { - if (_disposed) - throw new ObjectDisposedException(GetType().Name, "Object has been disposed."); - } - } -}
